Merge branch 'develop' into clokep/db-upgrades

clokep/db-upgrades
Patrick Cloke 2023-10-23 09:12:42 -04:00 committed by GitHub
commit 91e65700bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
62 changed files with 629 additions and 290 deletions

View File

@ -47,7 +47,7 @@ if not IS_PR:
"database": "sqlite",
"extras": "all",
}
for version in ("3.9", "3.10", "3.11", "3.12.0-rc.2")
for version in ("3.9", "3.10", "3.11", "3.12")
)
trial_postgres_tests = [
@ -62,7 +62,7 @@ trial_postgres_tests = [
if not IS_PR:
trial_postgres_tests.append(
{
"python-version": "3.11",
"python-version": "3.12",
"database": "postgres",
"postgres-version": "16",
"extras": "all",

View File

@ -37,15 +37,18 @@ jobs:
- 'Cargo.toml'
- 'Cargo.lock'
- '.rustfmt.toml'
- '.github/workflows/tests.yml'
trial:
- 'synapse/**'
- 'tests/**'
- 'rust/**'
- '.ci/scripts/calculate_jobs.py'
- 'Cargo.toml'
- 'Cargo.lock'
- 'pyproject.toml'
- 'poetry.lock'
- '.github/workflows/tests.yml'
integration:
- 'synapse/**'
@ -56,7 +59,9 @@ jobs:
- 'pyproject.toml'
- 'poetry.lock'
- 'docker/**'
- '.ci/**'
- 'scripts-dev/complement.sh'
- '.github/workflows/tests.yml'
linting:
- 'synapse/**'
@ -70,6 +75,7 @@ jobs:
- 'mypy.ini'
- 'pyproject.toml'
- 'poetry.lock'
- '.github/workflows/tests.yml'
check-sampleconfig:
runs-on: ubuntu-latest

View File

@ -1,3 +1,51 @@
# Synapse 1.95.0rc1 (2023-10-17)
### Bugfixes
- Remove legacy unspecced `knock_state_events` field returned in some responses. ([\#16403](https://github.com/matrix-org/synapse/issues/16403))
- Fix a bug introduced in Synapse 1.81.0 where an `AttributeError` would be raised when `_matrix/client/v3/account/whoami` is called over a unix socket. Contributed by @Sir-Photch. ([\#16404](https://github.com/matrix-org/synapse/issues/16404))
- Properly return inline media when content types have parameters. ([\#16440](https://github.com/matrix-org/synapse/issues/16440))
- Prevent the purging of large rooms from timing out when Postgres is in use. The timeout which causes this issue was introduced in Synapse 1.88.0. ([\#16455](https://github.com/matrix-org/synapse/issues/16455))
- Improve the performance of purging rooms, particularly encrypted rooms. ([\#16457](https://github.com/matrix-org/synapse/issues/16457))
- Fix a bug introduced in Synapse 1.59.0 where servers could be incorrectly marked as available after an error response was received. ([\#16506](https://github.com/matrix-org/synapse/issues/16506))
### Improved Documentation
- Document internal background update mechanism. ([\#16420](https://github.com/matrix-org/synapse/issues/16420))
- Fix a typo in the sql for [useful SQL for admins document](https://matrix-org.github.io/synapse/latest/usage/administration/useful_sql_for_admins.html). ([\#16477](https://github.com/matrix-org/synapse/issues/16477))
### Internal Changes
- Bump pyo3 from 0.17.1 to 0.19.2. ([\#16162](https://github.com/matrix-org/synapse/issues/16162))
- Update registration of media repository URLs. ([\#16419](https://github.com/matrix-org/synapse/issues/16419))
- Improve type hints. ([\#16421](https://github.com/matrix-org/synapse/issues/16421), [\#16468](https://github.com/matrix-org/synapse/issues/16468), [\#16469](https://github.com/matrix-org/synapse/issues/16469), [\#16507](https://github.com/matrix-org/synapse/issues/16507))
- Refactor some code to simplify and better type receipts stream adjacent code. ([\#16426](https://github.com/matrix-org/synapse/issues/16426))
- Factor out `MultiWriter` token from `RoomStreamToken`. ([\#16427](https://github.com/matrix-org/synapse/issues/16427))
- Improve code comments. ([\#16428](https://github.com/matrix-org/synapse/issues/16428))
- Reduce memory allocations. ([\#16429](https://github.com/matrix-org/synapse/issues/16429), [\#16431](https://github.com/matrix-org/synapse/issues/16431), [\#16433](https://github.com/matrix-org/synapse/issues/16433), [\#16434](https://github.com/matrix-org/synapse/issues/16434), [\#16438](https://github.com/matrix-org/synapse/issues/16438), [\#16444](https://github.com/matrix-org/synapse/issues/16444))
- Remove unused method. ([\#16435](https://github.com/matrix-org/synapse/issues/16435))
- Improve rate limiting logic. ([\#16441](https://github.com/matrix-org/synapse/issues/16441))
- Do not block running of CI behind the check for sign-off on PRs. ([\#16454](https://github.com/matrix-org/synapse/issues/16454))
- Update the release script to remind releaser to check for special release notes. ([\#16461](https://github.com/matrix-org/synapse/issues/16461))
- Update complement.sh to match new public API shape. ([\#16466](https://github.com/matrix-org/synapse/issues/16466))
- Clean up logging on event persister endpoints. ([\#16488](https://github.com/matrix-org/synapse/issues/16488))
- Remove useless async job to delete device messages on sync, since we only deliver (and hence delete) up to 100 device messages at a time. ([\#16491](https://github.com/matrix-org/synapse/issues/16491))
### Updates to locked dependencies
* Bump bleach from 6.0.0 to 6.1.0. ([\#16451](https://github.com/matrix-org/synapse/issues/16451))
* Bump jsonschema from 4.19.0 to 4.19.1. ([\#16500](https://github.com/matrix-org/synapse/issues/16500))
* Bump netaddr from 0.8.0 to 0.9.0. ([\#16453](https://github.com/matrix-org/synapse/issues/16453))
* Bump packaging from 23.1 to 23.2. ([\#16497](https://github.com/matrix-org/synapse/issues/16497))
* Bump pillow from 10.0.1 to 10.1.0. ([\#16498](https://github.com/matrix-org/synapse/issues/16498))
* Bump psycopg2 from 2.9.8 to 2.9.9. ([\#16452](https://github.com/matrix-org/synapse/issues/16452))
* Bump pyo3-log from 0.8.3 to 0.8.4. ([\#16495](https://github.com/matrix-org/synapse/issues/16495))
* Bump ruff from 0.0.290 to 0.0.292. ([\#16449](https://github.com/matrix-org/synapse/issues/16449))
* Bump sentry-sdk from 1.31.0 to 1.32.0. ([\#16496](https://github.com/matrix-org/synapse/issues/16496))
* Bump serde from 1.0.188 to 1.0.189. ([\#16494](https://github.com/matrix-org/synapse/issues/16494))
* Bump types-bleach from 6.0.0.4 to 6.1.0.0. ([\#16450](https://github.com/matrix-org/synapse/issues/16450))
* Bump types-jsonschema from 4.17.0.10 to 4.19.0.3. ([\#16499](https://github.com/matrix-org/synapse/issues/16499))
# Synapse 1.94.0 (2023-10-10)
No significant changes since 1.94.0rc1.

View File

@ -1 +0,0 @@
Bump pyo3 from 0.17.1 to 0.19.2.

View File

@ -1 +0,0 @@
Remove legacy unspecced `knock_state_events` field returned in some responses.

View File

@ -1 +0,0 @@
Fixes possbile `AttributeError` when `_matrix/client/v3/account/whoami` is called over a unix socket. Contributed by @Sir-Photch.

View File

@ -1 +0,0 @@
Update registration of media repository URLs.

View File

@ -1 +0,0 @@
Document internal background update mechanism.

View File

@ -1 +0,0 @@
Improve type hints.

View File

@ -1 +0,0 @@
Refactor some code to simplify and better type receipts stream adjacent code.

View File

@ -1 +0,0 @@
Factor out `MultiWriter` token from `RoomStreamToken`.

View File

@ -1 +0,0 @@
Improve code comments.

View File

@ -1 +0,0 @@
Reduce memory allocations.

View File

@ -1 +0,0 @@
Reduce memory allocations.

View File

@ -1 +0,0 @@
Reduce memory allocations.

View File

@ -1 +0,0 @@
Reduce memory allocations.

View File

@ -1 +0,0 @@
Remove unused method.

View File

@ -1 +0,0 @@
Reduce memory allocations.

View File

@ -1 +0,0 @@
Properly return inline media when content types have parameters.

View File

@ -1 +0,0 @@
Improve rate limiting logic.

View File

@ -1 +0,0 @@
Reduce memory allocations.

View File

@ -1 +0,0 @@
Do not block running of CI behind the check for sign-off on PRs.

View File

@ -1 +0,0 @@
Prevent the purging of large rooms from timing out when Postgres is in use. The timeout which causes this issue was introduced in Synapse 1.88.0.

View File

@ -1 +0,0 @@
Improve the performance of purging rooms, particularly encrypted rooms.

View File

@ -1 +0,0 @@
Update the release script to remind releaser to check for special release notes.

View File

@ -1 +0,0 @@
Update complement.sh to match new public API shape.

View File

@ -1 +0,0 @@
Improve type hints.

View File

@ -1 +0,0 @@
Improve type hints.

View File

@ -1 +0,0 @@
Fix a typo in the sql for [useful SQL for admins document](https://matrix-org.github.io/synapse/latest/usage/administration/useful_sql_for_admins.html).

1
changelog.d/16485.bugfix Normal file
View File

@ -0,0 +1 @@
Fix long-standing bug where `/sync` incorrectly did not mark a room as `limited` in a sync requests when there were missing remote events.

View File

@ -1 +0,0 @@
Clean up logging on event persister endpoints.

View File

@ -1 +0,0 @@
Remove useless async job to delete device messages on sync, since we only deliver (and hence delete) up to 100 device messages at a time.

1
changelog.d/16492.misc Normal file
View File

@ -0,0 +1 @@
Improve performance of delete device messages query, cf issue [16479](https://github.com/matrix-org/synapse/issues/16479).

1
changelog.d/16510.misc Normal file
View File

@ -0,0 +1 @@
Improve replication performance when purging rooms.

1
changelog.d/16511.misc Normal file
View File

@ -0,0 +1 @@
Run tests against Python 3.12.

1
changelog.d/16512.misc Normal file
View File

@ -0,0 +1 @@
Run trial & integration tests in continuous integration when `.ci` directory is modified.

1
changelog.d/16521.misc Normal file
View File

@ -0,0 +1 @@
Stop deleting from an unused table.

1
changelog.d/16529.doc Normal file
View File

@ -0,0 +1 @@
Improve documentation of presence router.

1
changelog.d/16530.bugfix Normal file
View File

@ -0,0 +1 @@
Force TLS certificate verification in user registration script.

1
changelog.d/16531.doc Normal file
View File

@ -0,0 +1 @@
Add a sentence to the opentracing docs on how you can have jaeger in a different place than synapse.

1
changelog.d/16539.misc Normal file
View File

@ -0,0 +1 @@
Bump matrix-synapse-ldap3 from 0.2.2 to 0.3.0.

6
debian/changelog vendored
View File

@ -1,3 +1,9 @@
matrix-synapse-py3 (1.95.0~rc1) stable; urgency=medium
* New synapse release 1.95.0rc1.
-- Synapse Packaging team <packages@matrix.org> Tue, 17 Oct 2023 15:50:17 +0000
matrix-synapse-py3 (1.94.0) stable; urgency=medium
* New Synapse release 1.94.0.

View File

@ -1,8 +1,16 @@
# Presence router callbacks
Presence router callbacks allow module developers to specify additional users (local or remote)
to receive certain presence updates from local users. Presence router callbacks can be
registered using the module API's `register_presence_router_callbacks` method.
Presence router callbacks allow module developers to define additional users
which receive presence updates from local users. The additional users
can be local or remote.
For example, it could be used to direct all of `@alice:example.com` (a local user)'s
presence updates to `@bob:matrix.org` (a remote user), even though they don't share a
room. (Note that those presence updates might not make it to `@bob:matrix.org`'s client
unless a similar presence router is running on that homeserver.)
Presence router callbacks can be registered using the module API's
`register_presence_router_callbacks` method.
## Callbacks

View File

@ -51,6 +51,11 @@ docker run -d --name jaeger \
jaegertracing/all-in-one:1
```
By default, Synapse will publish traces to Jaeger on localhost.
If Jaeger is hosted elsewhere, point Synapse to the correct host by setting
`opentracing.jaeger_config.local_agent.reporting_host` [in the Synapse configuration](usage/configuration/config_documentation.md#opentracing-1)
or by setting the `JAEGER_AGENT_HOST` environment variable to the desired address.
Latest documentation is probably at
https://www.jaegertracing.io/docs/latest/getting-started.

115
poetry.lock generated
View File

@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand.
[[package]]
name = "alabaster"
@ -162,33 +162,29 @@ lxml = ["lxml"]
[[package]]
name = "black"
version = "23.9.1"
version = "23.10.0"
description = "The uncompromising code formatter."
optional = false
python-versions = ">=3.8"
files = [
{file = "black-23.9.1-cp310-cp310-macosx_10_16_arm64.whl", hash = "sha256:d6bc09188020c9ac2555a498949401ab35bb6bf76d4e0f8ee251694664df6301"},
{file = "black-23.9.1-cp310-cp310-macosx_10_16_universal2.whl", hash = "sha256:13ef033794029b85dfea8032c9d3b92b42b526f1ff4bf13b2182ce4e917f5100"},
{file = "black-23.9.1-cp310-cp310-macosx_10_16_x86_64.whl", hash = "sha256:75a2dc41b183d4872d3a500d2b9c9016e67ed95738a3624f4751a0cb4818fe71"},
{file = "black-23.9.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:13a2e4a93bb8ca74a749b6974925c27219bb3df4d42fc45e948a5d9feb5122b7"},
{file = "black-23.9.1-cp310-cp310-win_amd64.whl", hash = "sha256:adc3e4442eef57f99b5590b245a328aad19c99552e0bdc7f0b04db6656debd80"},
{file = "black-23.9.1-cp311-cp311-macosx_10_16_arm64.whl", hash = "sha256:8431445bf62d2a914b541da7ab3e2b4f3bc052d2ccbf157ebad18ea126efb91f"},
{file = "black-23.9.1-cp311-cp311-macosx_10_16_universal2.whl", hash = "sha256:8fc1ddcf83f996247505db6b715294eba56ea9372e107fd54963c7553f2b6dfe"},
{file = "black-23.9.1-cp311-cp311-macosx_10_16_x86_64.whl", hash = "sha256:7d30ec46de88091e4316b17ae58bbbfc12b2de05e069030f6b747dfc649ad186"},
{file = "black-23.9.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:031e8c69f3d3b09e1aa471a926a1eeb0b9071f80b17689a655f7885ac9325a6f"},
{file = "black-23.9.1-cp311-cp311-win_amd64.whl", hash = "sha256:538efb451cd50f43aba394e9ec7ad55a37598faae3348d723b59ea8e91616300"},
{file = "black-23.9.1-cp38-cp38-macosx_10_16_arm64.whl", hash = "sha256:638619a559280de0c2aa4d76f504891c9860bb8fa214267358f0a20f27c12948"},
{file = "black-23.9.1-cp38-cp38-macosx_10_16_universal2.whl", hash = "sha256:a732b82747235e0542c03bf352c126052c0fbc458d8a239a94701175b17d4855"},
{file = "black-23.9.1-cp38-cp38-macosx_10_16_x86_64.whl", hash = "sha256:cf3a4d00e4cdb6734b64bf23cd4341421e8953615cba6b3670453737a72ec204"},
{file = "black-23.9.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:cf99f3de8b3273a8317681d8194ea222f10e0133a24a7548c73ce44ea1679377"},
{file = "black-23.9.1-cp38-cp38-win_amd64.whl", hash = "sha256:14f04c990259576acd093871e7e9b14918eb28f1866f91968ff5524293f9c573"},
{file = "black-23.9.1-cp39-cp39-macosx_10_16_arm64.whl", hash = "sha256:c619f063c2d68f19b2d7270f4cf3192cb81c9ec5bc5ba02df91471d0b88c4c5c"},
{file = "black-23.9.1-cp39-cp39-macosx_10_16_universal2.whl", hash = "sha256:6a3b50e4b93f43b34a9d3ef00d9b6728b4a722c997c99ab09102fd5efdb88325"},
{file = "black-23.9.1-cp39-cp39-macosx_10_16_x86_64.whl", hash = "sha256:c46767e8df1b7beefb0899c4a95fb43058fa8500b6db144f4ff3ca38eb2f6393"},
{file = "black-23.9.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:50254ebfa56aa46a9fdd5d651f9637485068a1adf42270148cd101cdf56e0ad9"},
{file = "black-23.9.1-cp39-cp39-win_amd64.whl", hash = "sha256:403397c033adbc45c2bd41747da1f7fc7eaa44efbee256b53842470d4ac5a70f"},
{file = "black-23.9.1-py3-none-any.whl", hash = "sha256:6ccd59584cc834b6d127628713e4b6b968e5f79572da66284532525a042549f9"},
{file = "black-23.9.1.tar.gz", hash = "sha256:24b6b3ff5c6d9ea08a8888f6977eae858e1f340d7260cf56d70a49823236b62d"},
{file = "black-23.10.0-cp310-cp310-macosx_10_16_arm64.whl", hash = "sha256:f8dc7d50d94063cdfd13c82368afd8588bac4ce360e4224ac399e769d6704e98"},
{file = "black-23.10.0-cp310-cp310-macosx_10_16_x86_64.whl", hash = "sha256:f20ff03f3fdd2fd4460b4f631663813e57dc277e37fb216463f3b907aa5a9bdd"},
{file = "black-23.10.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d3d9129ce05b0829730323bdcb00f928a448a124af5acf90aa94d9aba6969604"},
{file = "black-23.10.0-cp310-cp310-win_amd64.whl", hash = "sha256:960c21555be135c4b37b7018d63d6248bdae8514e5c55b71e994ad37407f45b8"},
{file = "black-23.10.0-cp311-cp311-macosx_10_16_arm64.whl", hash = "sha256:30b78ac9b54cf87bcb9910ee3d499d2bc893afd52495066c49d9ee6b21eee06e"},
{file = "black-23.10.0-cp311-cp311-macosx_10_16_x86_64.whl", hash = "sha256:0e232f24a337fed7a82c1185ae46c56c4a6167fb0fe37411b43e876892c76699"},
{file = "black-23.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:31946ec6f9c54ed7ba431c38bc81d758970dd734b96b8e8c2b17a367d7908171"},
{file = "black-23.10.0-cp311-cp311-win_amd64.whl", hash = "sha256:c870bee76ad5f7a5ea7bd01dc646028d05568d33b0b09b7ecfc8ec0da3f3f39c"},
{file = "black-23.10.0-cp38-cp38-macosx_10_16_arm64.whl", hash = "sha256:6901631b937acbee93c75537e74f69463adaf34379a04eef32425b88aca88a23"},
{file = "black-23.10.0-cp38-cp38-macosx_10_16_x86_64.whl", hash = "sha256:481167c60cd3e6b1cb8ef2aac0f76165843a374346aeeaa9d86765fe0dd0318b"},
{file = "black-23.10.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f74892b4b836e5162aa0452393112a574dac85e13902c57dfbaaf388e4eda37c"},
{file = "black-23.10.0-cp38-cp38-win_amd64.whl", hash = "sha256:47c4510f70ec2e8f9135ba490811c071419c115e46f143e4dce2ac45afdcf4c9"},
{file = "black-23.10.0-cp39-cp39-macosx_10_16_arm64.whl", hash = "sha256:76baba9281e5e5b230c9b7f83a96daf67a95e919c2dfc240d9e6295eab7b9204"},
{file = "black-23.10.0-cp39-cp39-macosx_10_16_x86_64.whl", hash = "sha256:a3c2ddb35f71976a4cfeca558848c2f2f89abc86b06e8dd89b5a65c1e6c0f22a"},
{file = "black-23.10.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:db451a3363b1e765c172c3fd86213a4ce63fb8524c938ebd82919bf2a6e28c6a"},
{file = "black-23.10.0-cp39-cp39-win_amd64.whl", hash = "sha256:7fb5fc36bb65160df21498d5a3dd330af8b6401be3f25af60c6ebfe23753f747"},
{file = "black-23.10.0-py3-none-any.whl", hash = "sha256:e223b731a0e025f8ef427dd79d8cd69c167da807f5710add30cdf131f13dd62e"},
{file = "black-23.10.0.tar.gz", hash = "sha256:31b9f87b277a68d0e99d2905edae08807c007973eaa609da5f0c62def6b7c0bd"},
]
[package.dependencies]
@ -600,20 +596,20 @@ smmap = ">=3.0.1,<6"
[[package]]
name = "gitpython"
version = "3.1.37"
version = "3.1.40"
description = "GitPython is a Python library used to interact with Git repositories"
optional = false
python-versions = ">=3.7"
files = [
{file = "GitPython-3.1.37-py3-none-any.whl", hash = "sha256:5f4c4187de49616d710a77e98ddf17b4782060a1788df441846bddefbb89ab33"},
{file = "GitPython-3.1.37.tar.gz", hash = "sha256:f9b9ddc0761c125d5780eab2d64be4873fc6817c2899cbcb34b02344bdc7bc54"},
{file = "GitPython-3.1.40-py3-none-any.whl", hash = "sha256:cf14627d5a8049ffbf49915732e5eddbe8134c3bdb9d476e6182b676fc573f8a"},
{file = "GitPython-3.1.40.tar.gz", hash = "sha256:22b126e9ffb671fdd0c129796343a02bf67bf2994b35449ffc9321aa755e18a4"},
]
[package.dependencies]
gitdb = ">=4.0.1,<5"
[package.extras]
test = ["black", "coverage[toml]", "ddt (>=1.1.1,!=1.4.3)", "mypy", "pre-commit", "pytest", "pytest-cov", "pytest-sugar"]
test = ["black", "coverage[toml]", "ddt (>=1.1.1,!=1.4.3)", "mock", "mypy", "pre-commit", "pytest", "pytest-cov", "pytest-instafail", "pytest-subtests", "pytest-sugar"]
[[package]]
name = "hiredis"
@ -1341,13 +1337,13 @@ test = ["aiounittest", "tox", "twisted"]
[[package]]
name = "matrix-synapse-ldap3"
version = "0.2.2"
version = "0.3.0"
description = "An LDAP3 auth provider for Synapse"
optional = true
python-versions = ">=3.7"
files = [
{file = "matrix-synapse-ldap3-0.2.2.tar.gz", hash = "sha256:b388d95693486eef69adaefd0fd9e84463d52fe17b0214a00efcaa669b73cb74"},
{file = "matrix_synapse_ldap3-0.2.2-py3-none-any.whl", hash = "sha256:66ee4c85d7952c6c27fd04c09cdfdf4847b8e8b7d6a7ada6ba1100013bda060f"},
{file = "matrix-synapse-ldap3-0.3.0.tar.gz", hash = "sha256:8bb6517173164d4b9cc44f49de411d8cebdb2e705d5dd1ea1f38733c4a009e1d"},
{file = "matrix_synapse_ldap3-0.3.0-py3-none-any.whl", hash = "sha256:8b4d701f8702551e98cc1d8c20dbed532de5613584c08d0df22de376ba99159d"},
]
[package.dependencies]
@ -1980,20 +1976,23 @@ typing-extensions = ">=4.6.0,<4.7.0 || >4.7.0"
[[package]]
name = "pygithub"
version = "1.59.1"
version = "2.1.1"
description = "Use the full Github API v3"
optional = false
python-versions = ">=3.7"
files = [
{file = "PyGithub-1.59.1-py3-none-any.whl", hash = "sha256:3d87a822e6c868142f0c2c4bf16cce4696b5a7a4d142a7bd160e1bdf75bc54a9"},
{file = "PyGithub-1.59.1.tar.gz", hash = "sha256:c44e3a121c15bf9d3a5cc98d94c9a047a5132a9b01d22264627f58ade9ddc217"},
{file = "PyGithub-2.1.1-py3-none-any.whl", hash = "sha256:4b528d5d6f35e991ea5fd3f942f58748f24938805cb7fcf24486546637917337"},
{file = "PyGithub-2.1.1.tar.gz", hash = "sha256:ecf12c2809c44147bce63b047b3d2e9dac8a41b63e90fcb263c703f64936b97c"},
]
[package.dependencies]
deprecated = "*"
Deprecated = "*"
pyjwt = {version = ">=2.4.0", extras = ["crypto"]}
pynacl = ">=1.4.0"
python-dateutil = "*"
requests = ">=2.14.0"
typing-extensions = ">=4.0.0"
urllib3 = ">=1.26.0"
[[package]]
name = "pygments"
@ -2137,7 +2136,7 @@ s2repoze = ["paste", "repoze.who", "zope.interface"]
name = "python-dateutil"
version = "2.8.2"
description = "Extensions to the standard Python datetime module"
optional = true
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7"
files = [
{file = "python-dateutil-2.8.2.tar.gz", hash = "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86"},
@ -3106,13 +3105,13 @@ files = [
[[package]]
name = "types-pillow"
version = "10.0.0.3"
version = "10.1.0.0"
description = "Typing stubs for Pillow"
optional = false
python-versions = "*"
python-versions = ">=3.7"
files = [
{file = "types-Pillow-10.0.0.3.tar.gz", hash = "sha256:ae0c877d363da349bbb82c5463c9e78037290cc07d3714cb0ceaf5d2f7f5c825"},
{file = "types_Pillow-10.0.0.3-py3-none-any.whl", hash = "sha256:54a49f3c6a3f5e95ebeee396d7773dde22ce2515d594f9c0596c0a983558f0d4"},
{file = "types-Pillow-10.1.0.0.tar.gz", hash = "sha256:0f5e7cf010ed226800cb5821e87781e5d0e81257d948a9459baa74a8c8b7d822"},
{file = "types_Pillow-10.1.0.0-py3-none-any.whl", hash = "sha256:f97f596b6a39ddfd26da3eb67421062193e10732d2310f33898d36f9694331b5"},
]
[[package]]
@ -3153,17 +3152,17 @@ files = [
[[package]]
name = "types-requests"
version = "2.31.0.2"
version = "2.31.0.10"
description = "Typing stubs for requests"
optional = false
python-versions = "*"
python-versions = ">=3.7"
files = [
{file = "types-requests-2.31.0.2.tar.gz", hash = "sha256:6aa3f7faf0ea52d728bb18c0a0d1522d9bfd8c72d26ff6f61bfc3d06a411cf40"},
{file = "types_requests-2.31.0.2-py3-none-any.whl", hash = "sha256:56d181c85b5925cbc59f4489a57e72a8b2166f18273fd8ba7b6fe0c0b986f12a"},
{file = "types-requests-2.31.0.10.tar.gz", hash = "sha256:dc5852a76f1eaf60eafa81a2e50aefa3d1f015c34cf0cba130930866b1b22a92"},
{file = "types_requests-2.31.0.10-py3-none-any.whl", hash = "sha256:b32b9a86beffa876c0c3ac99a4cd3b8b51e973fb8e3bd4e0a6bb32c7efad80fc"},
]
[package.dependencies]
types-urllib3 = "*"
urllib3 = ">=2"
[[package]]
name = "types-setuptools"
@ -3176,17 +3175,6 @@ files = [
{file = "types_setuptools-68.2.0.0-py3-none-any.whl", hash = "sha256:77edcc843e53f8fc83bb1a840684841f3dc804ec94562623bfa2ea70d5a2ba1b"},
]
[[package]]
name = "types-urllib3"
version = "1.26.25.8"
description = "Typing stubs for urllib3"
optional = false
python-versions = "*"
files = [
{file = "types-urllib3-1.26.25.8.tar.gz", hash = "sha256:ecf43c42d8ee439d732a1110b4901e9017a79a38daca26f08e42c8460069392c"},
{file = "types_urllib3-1.26.25.8-py3-none-any.whl", hash = "sha256:95ea847fbf0bf675f50c8ae19a665baedcf07e6b4641662c4c3c72e7b2edf1a9"},
]
[[package]]
name = "typing-extensions"
version = "4.8.0"
@ -3211,19 +3199,20 @@ files = [
[[package]]
name = "urllib3"
version = "1.26.17"
version = "2.0.7"
description = "HTTP library with thread-safe connection pooling, file post, and more."
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*"
python-versions = ">=3.7"
files = [
{file = "urllib3-1.26.17-py2.py3-none-any.whl", hash = "sha256:94a757d178c9be92ef5539b8840d48dc9cf1b2709c9d6b588232a055c524458b"},
{file = "urllib3-1.26.17.tar.gz", hash = "sha256:24d6a242c28d29af46c3fae832c36db3bbebcc533dd1bb549172cd739c82df21"},
{file = "urllib3-2.0.7-py3-none-any.whl", hash = "sha256:fdb6d215c776278489906c2f8916e6e7d4f5a9b602ccbcfdf7f016fc8da0596e"},
{file = "urllib3-2.0.7.tar.gz", hash = "sha256:c97dfde1f7bd43a71c8d2a58e369e9b2bf692d1334ea9f9cae55add7d0dd0f84"},
]
[package.extras]
brotli = ["brotli (==1.0.9)", "brotli (>=1.0.9)", "brotlicffi (>=0.8.0)", "brotlipy (>=0.6.0)"]
secure = ["certifi", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "ipaddress", "pyOpenSSL (>=0.14)", "urllib3-secure-extra"]
socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"]
brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"]
secure = ["certifi", "cryptography (>=1.9)", "idna (>=2.0.0)", "pyopenssl (>=17.1.0)", "urllib3-secure-extra"]
socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"]
zstd = ["zstandard (>=0.18.0)"]
[[package]]
name = "webencodings"

View File

@ -96,7 +96,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry]
name = "matrix-synapse"
version = "1.94.0"
version = "1.95.0rc1"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "Apache-2.0"

View File

@ -50,7 +50,7 @@ def request_registration(
url = "%s/_synapse/admin/v1/register" % (server_location.rstrip("/"),)
# Get the nonce
r = requests.get(url, verify=False)
r = requests.get(url)
if r.status_code != 200:
_print("ERROR! Received %d %s" % (r.status_code, r.reason))
@ -88,7 +88,7 @@ def request_registration(
}
_print("Sending registration request...")
r = requests.post(url, json=data, verify=False)
r = requests.post(url, json=data)
if r.status_code != 200:
_print("ERROR! Received %d %s" % (r.status_code, r.reason))

View File

@ -14,17 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterable,
List,
Mapping,
Optional,
Set,
Tuple,
)
from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping, Optional, Set, Tuple
from synapse.api import errors
from synapse.api.constants import EduTypes, EventTypes
@ -41,6 +31,7 @@ from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.storage.databases.main.client_ips import DeviceLastConnectionInfo
from synapse.types import (
JsonDict,
JsonMapping,
@ -601,6 +592,8 @@ class DeviceHandler(DeviceWorkerHandler):
)
# Delete device messages asynchronously and in batches using the task scheduler
# We specify an upper stream id to avoid deleting non delivered messages
# if an user re-uses a device ID.
await self._task_scheduler.schedule_task(
DELETE_DEVICE_MSGS_TASK_NAME,
resource_id=device_id,
@ -1008,14 +1001,14 @@ class DeviceHandler(DeviceWorkerHandler):
def _update_device_from_client_ips(
device: JsonDict, client_ips: Mapping[Tuple[str, str], Mapping[str, Any]]
device: JsonDict, client_ips: Mapping[Tuple[str, str], DeviceLastConnectionInfo]
) -> None:
ip = client_ips.get((device["user_id"], device["device_id"]), {})
ip = client_ips.get((device["user_id"], device["device_id"]))
device.update(
{
"last_seen_user_agent": ip.get("user_agent"),
"last_seen_ts": ip.get("last_seen"),
"last_seen_ip": ip.get("ip"),
"last_seen_user_agent": ip.user_agent if ip else None,
"last_seen_ts": ip.last_seen if ip else None,
"last_seen_ip": ip.ip if ip else None,
}
)

View File

@ -500,12 +500,27 @@ class SyncHandler:
async def _load_filtered_recents(
self,
room_id: str,
sync_result_builder: "SyncResultBuilder",
sync_config: SyncConfig,
now_token: StreamToken,
upto_token: StreamToken,
since_token: Optional[StreamToken] = None,
potential_recents: Optional[List[EventBase]] = None,
newly_joined_room: bool = False,
) -> TimelineBatch:
"""Create a timeline batch for the room
Args:
room_id
sync_result_builder
sync_config
upto_token: The token up to which we should fetch (more) events.
If `potential_results` is non-empty then this is *start* of
the the list.
since_token
potential_recents: If non-empty, the events between the since token
and current token to send down to clients.
newly_joined_room
"""
with Measure(self.clock, "load_filtered_recents"):
timeline_limit = sync_config.filter_collection.timeline_limit()
block_all_timeline = (
@ -521,6 +536,20 @@ class SyncHandler:
else:
limited = False
# Check if there is a gap, if so we need to mark this as limited and
# recalculate which events to send down.
gap_token = await self.store.get_timeline_gaps(
room_id,
since_token.room_key if since_token else None,
sync_result_builder.now_token.room_key,
)
if gap_token:
# There's a gap, so we need to ignore the passed in
# `potential_recents`, and reset `upto_token` to match.
potential_recents = None
upto_token = sync_result_builder.now_token
limited = True
log_kv({"limited": limited})
if potential_recents:
@ -559,10 +588,10 @@ class SyncHandler:
recents = []
if not limited or block_all_timeline:
prev_batch_token = now_token
prev_batch_token = upto_token
if recents:
room_key = recents[0].internal_metadata.before
prev_batch_token = now_token.copy_and_replace(
prev_batch_token = upto_token.copy_and_replace(
StreamKeyType.ROOM, room_key
)
@ -573,11 +602,15 @@ class SyncHandler:
filtering_factor = 2
load_limit = max(timeline_limit * filtering_factor, 10)
max_repeat = 5 # Only try a few times per room, otherwise
room_key = now_token.room_key
room_key = upto_token.room_key
end_key = room_key
since_key = None
if since_token and not newly_joined_room:
if since_token and gap_token:
# If there is a gap then we need to only include events after
# it.
since_key = gap_token
elif since_token and not newly_joined_room:
since_key = since_token.room_key
while limited and len(recents) < timeline_limit and max_repeat:
@ -647,7 +680,7 @@ class SyncHandler:
recents = recents[-timeline_limit:]
room_key = recents[0].internal_metadata.before
prev_batch_token = now_token.copy_and_replace(StreamKeyType.ROOM, room_key)
prev_batch_token = upto_token.copy_and_replace(StreamKeyType.ROOM, room_key)
# Don't bother to bundle aggregations if the timeline is unlimited,
# as clients will have all the necessary information.
@ -662,7 +695,9 @@ class SyncHandler:
return TimelineBatch(
events=recents,
prev_batch=prev_batch_token,
limited=limited or newly_joined_room,
# Also mark as limited if this is a new room or there has been a gap
# (to force client to paginate the gap).
limited=limited or newly_joined_room or gap_token is not None,
bundled_aggregations=bundled_aggregations,
)
@ -2397,8 +2432,9 @@ class SyncHandler:
batch = await self._load_filtered_recents(
room_id,
sync_result_builder,
sync_config,
now_token=upto_token,
upto_token=upto_token,
since_token=since_token,
potential_recents=events,
newly_joined_room=newly_joined,

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import heapq
from collections import defaultdict
from typing import TYPE_CHECKING, Iterable, Optional, Tuple, Type, TypeVar, cast
import attr
@ -51,8 +52,19 @@ data part are:
* The state_key of the state which has changed
* The event id of the new state
A "state-all" row is sent whenever the "current state" in a room changes, but there are
too many state updates for a particular room in the same update. This replaces any
"state" rows on a per-room basis. The fields in the data part are:
* The room id for the state changes
"""
# Any room with more than _MAX_STATE_UPDATES_PER_ROOM will send a EventsStreamAllStateRow
# instead of individual EventsStreamEventRow. This is predominantly useful when
# purging large rooms.
_MAX_STATE_UPDATES_PER_ROOM = 150
@attr.s(slots=True, frozen=True, auto_attribs=True)
class EventsStreamRow:
@ -111,9 +123,17 @@ class EventsStreamCurrentStateRow(BaseEventsStreamRow):
event_id: Optional[str]
@attr.s(slots=True, frozen=True, auto_attribs=True)
class EventsStreamAllStateRow(BaseEventsStreamRow):
TypeId = "state-all"
room_id: str
_EventRows: Tuple[Type[BaseEventsStreamRow], ...] = (
EventsStreamEventRow,
EventsStreamCurrentStateRow,
EventsStreamAllStateRow,
)
TypeToRow = {Row.TypeId: Row for Row in _EventRows}
@ -213,9 +233,28 @@ class EventsStream(Stream):
if stream_id <= upper_limit
)
# Separate out rooms that have many state updates, listeners should clear
# all state for those rooms.
state_updates_by_room = defaultdict(list)
for stream_id, room_id, _type, _state_key, _event_id in state_rows:
state_updates_by_room[room_id].append(stream_id)
state_all_rows = [
(stream_ids[-1], room_id)
for room_id, stream_ids in state_updates_by_room.items()
if len(stream_ids) >= _MAX_STATE_UPDATES_PER_ROOM
]
state_all_updates: Iterable[Tuple[int, Tuple]] = (
(max_stream_id, (EventsStreamAllStateRow.TypeId, (room_id,)))
for (max_stream_id, room_id) in state_all_rows
)
# Any remaining state updates are sent individually.
state_all_rooms = {room_id for _, room_id in state_all_rows}
state_updates: Iterable[Tuple[int, Tuple]] = (
(stream_id, (EventsStreamCurrentStateRow.TypeId, rest))
for (stream_id, *rest) in state_rows
if rest[0] not in state_all_rooms
)
ex_outliers_updates: Iterable[Tuple[int, Tuple]] = (
@ -224,7 +263,11 @@ class EventsStream(Stream):
)
# we need to return a sorted list, so merge them together.
updates = list(heapq.merge(event_updates, state_updates, ex_outliers_updates))
updates = list(
heapq.merge(
event_updates, state_all_updates, state_updates, ex_outliers_updates
)
)
return updates, upper_limit, limited
@classmethod

View File

@ -23,6 +23,7 @@ from synapse.metrics.background_process_metrics import wrap_as_background_proces
from synapse.replication.tcp.streams import BackfillStream, CachesStream
from synapse.replication.tcp.streams.events import (
EventsStream,
EventsStreamAllStateRow,
EventsStreamCurrentStateRow,
EventsStreamEventRow,
EventsStreamRow,
@ -264,6 +265,13 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
(data.state_key,)
)
self.get_rooms_for_user.invalidate((data.state_key,)) # type: ignore[attr-defined]
elif row.type == EventsStreamAllStateRow.TypeId:
assert isinstance(data, EventsStreamAllStateRow)
# Similar to the above, but the entire caches are invalidated. This is
# unfortunate for the membership caches, but should recover quickly.
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
self.get_rooms_for_user_with_stream_ordering.invalidate_all() # type: ignore[attr-defined]
self.get_rooms_for_user.invalidate_all() # type: ignore[attr-defined]
else:
raise Exception("Unknown events stream row type %s" % (row.type,))

View File

@ -15,6 +15,7 @@
import logging
from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union, cast
import attr
from typing_extensions import TypedDict
from synapse.metrics.background_process_metrics import wrap_as_background_process
@ -42,7 +43,8 @@ logger = logging.getLogger(__name__)
LAST_SEEN_GRANULARITY = 120 * 1000
class DeviceLastConnectionInfo(TypedDict):
@attr.s(slots=True, frozen=True, auto_attribs=True)
class DeviceLastConnectionInfo:
"""Metadata for the last connection seen for a user and device combination"""
# These types must match the columns in the `devices` table
@ -499,24 +501,29 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke
device_id: If None fetches all devices for the user
Returns:
A dictionary mapping a tuple of (user_id, device_id) to dicts, with
keys giving the column names from the devices table.
A dictionary mapping a tuple of (user_id, device_id) to DeviceLastConnectionInfo.
"""
keyvalues = {"user_id": user_id}
if device_id is not None:
keyvalues["device_id"] = device_id
res = cast(
List[DeviceLastConnectionInfo],
await self.db_pool.simple_select_list(
table="devices",
keyvalues=keyvalues,
retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"),
),
res = await self.db_pool.simple_select_list(
table="devices",
keyvalues=keyvalues,
retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"),
)
return {(d["user_id"], d["device_id"]): d for d in res}
return {
(d["user_id"], d["device_id"]): DeviceLastConnectionInfo(
user_id=d["user_id"],
device_id=d["device_id"],
ip=d["ip"],
user_agent=d["user_agent"],
last_seen=d["last_seen"],
)
for d in res
}
async def _get_user_ip_and_agents_from_database(
self, user: UserID, since_ts: int = 0
@ -683,8 +690,7 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke
device_id: If None fetches all devices for the user
Returns:
A dictionary mapping a tuple of (user_id, device_id) to dicts, with
keys giving the column names from the devices table.
A dictionary mapping a tuple of (user_id, device_id) to DeviceLastConnectionInfo.
"""
ret = await self._get_last_client_ip_by_device_from_database(user_id, device_id)
@ -705,13 +711,13 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke
continue
if not device_id or did == device_id:
ret[(user_id, did)] = {
"user_id": user_id,
"ip": ip,
"user_agent": user_agent,
"device_id": did,
"last_seen": last_seen,
}
ret[(user_id, did)] = DeviceLastConnectionInfo(
user_id=user_id,
ip=ip,
user_agent=user_agent,
device_id=did,
last_seen=last_seen,
)
return ret
async def get_user_ip_and_agents(

View File

@ -478,18 +478,19 @@ class DeviceInboxWorkerStore(SQLBaseStore):
log_kv({"message": "No changes in cache since last check"})
return 0
ROW_ID_NAME = self.database_engine.row_id_name
def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
limit_statement = "" if limit is None else f"LIMIT {limit}"
sql = f"""
DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN (
SELECT {ROW_ID_NAME} FROM device_inbox
WHERE user_id = ? AND device_id = ? AND stream_id <= ?
{limit_statement}
DELETE FROM device_inbox WHERE user_id = ? AND device_id = ? AND stream_id <= (
SELECT MAX(stream_id) FROM (
SELECT stream_id FROM device_inbox
WHERE user_id = ? AND device_id = ? AND stream_id <= ?
ORDER BY stream_id
{limit_statement}
) AS q1
)
"""
txn.execute(sql, (user_id, device_id, up_to_stream_id))
txn.execute(sql, (user_id, device_id, user_id, device_id, up_to_stream_id))
return txn.rowcount
count = await self.db_pool.runInteraction(

View File

@ -2267,35 +2267,59 @@ class PersistEventsStore:
Forward extremities are handled when we first start persisting the events.
"""
# From the events passed in, add all of the prev events as backwards extremities.
# Ignore any events that are already backwards extrems or outliers.
query = (
"INSERT INTO event_backward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
" SELECT 1 FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
" )"
# 1. Don't add an event as a extremity again if we already persisted it
# as a non-outlier.
# 2. Don't add an outlier as an extremity if it has no prev_events
" AND NOT EXISTS ("
" SELECT 1 FROM events"
" LEFT JOIN event_edges edge"
" ON edge.event_id = events.event_id"
" WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = FALSE OR edge.event_id IS NULL)"
" )"
room_id = events[0].room_id
potential_backwards_extremities = {
e_id
for ev in events
for e_id in ev.prev_event_ids()
if not ev.internal_metadata.is_outlier()
}
if not potential_backwards_extremities:
return
existing_events_outliers = self.db_pool.simple_select_many_txn(
txn,
table="events",
column="event_id",
iterable=potential_backwards_extremities,
keyvalues={"outlier": False},
retcols=("event_id",),
)
txn.execute_batch(
query,
[
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id)
for ev in events
for e_id in ev.prev_event_ids()
if not ev.internal_metadata.is_outlier()
],
potential_backwards_extremities.difference_update(
e for e, in existing_events_outliers
)
if potential_backwards_extremities:
self.db_pool.simple_upsert_many_txn(
txn,
table="event_backward_extremities",
key_names=("room_id", "event_id"),
key_values=[(room_id, ev) for ev in potential_backwards_extremities],
value_names=(),
value_values=(),
)
# Record the stream orderings where we have new gaps.
gap_events = [
(room_id, self._instance_name, ev.internal_metadata.stream_ordering)
for ev in events
if any(
e_id in potential_backwards_extremities
for e_id in ev.prev_event_ids()
)
]
self.db_pool.simple_insert_many_txn(
txn,
table="timeline_gaps",
keys=("room_id", "instance_name", "stream_ordering"),
values=gap_events,
)
# Delete all these events that we've already fetched and now know that their
# prev events are the new backwards extremeties.
query = (

View File

@ -2095,12 +2095,6 @@ class EventsWorkerStore(SQLBaseStore):
def _cleanup_old_transaction_ids_txn(txn: LoggingTransaction) -> None:
one_day_ago = self._clock.time_msec() - 24 * 60 * 60 * 1000
sql = """
DELETE FROM event_txn_id
WHERE inserted_ts < ?
"""
txn.execute(sql, (one_day_ago,))
sql = """
DELETE FROM event_txn_id_device_id
WHERE inserted_ts < ?

View File

@ -1616,3 +1616,50 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
retcol="instance_name",
desc="get_name_from_instance_id",
)
async def get_timeline_gaps(
self,
room_id: str,
from_token: Optional[RoomStreamToken],
to_token: RoomStreamToken,
) -> Optional[RoomStreamToken]:
"""Check if there is a gap, and return a token that marks the position
of the gap in the stream.
"""
sql = """
SELECT instance_name, stream_ordering
FROM timeline_gaps
WHERE room_id = ? AND ? < stream_ordering AND stream_ordering <= ?
ORDER BY stream_ordering
"""
rows = await self.db_pool.execute(
"get_timeline_gaps",
None,
sql,
room_id,
from_token.stream if from_token else 0,
to_token.get_max_stream_pos(),
)
if not rows:
return None
positions = [
PersistedEventPosition(instance_name, stream_ordering)
for instance_name, stream_ordering in rows
]
if from_token:
positions = [p for p in positions if p.persisted_after(from_token)]
positions = [p for p in positions if not p.persisted_after(to_token)]
if positions:
# We return a stream token that ensures the event *at* the position
# of the gap is included (as the gap is *before* the persisted
# event).
last_position = positions[-1]
return RoomStreamToken(stream=last_position.stream - 1)
return None

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
SCHEMA_VERSION = 82 # remember to update the list below when updating
SCHEMA_VERSION = 83 # remember to update the list below when updating
"""Represents the expectations made by the codebase about the database schema
This should be incremented whenever the codebase changes its requirements on the
@ -121,6 +121,9 @@ Changes in SCHEMA_VERSION = 81
Changes in SCHEMA_VERSION = 82
- The insertion_events, insertion_event_extremities, insertion_event_edges, and
batch_events tables are no longer purged in preparation for their removal.
Changes in SCHEMA_VERSION = 83
- The event_txn_id is no longer used.
"""

View File

@ -0,0 +1,25 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Records when we see a "gap in the timeline", due to missing events over
-- federation. We record this so that we can tell clients there is a gap (by
-- marking the timeline section of a sync request as limited).
CREATE TABLE IF NOT EXISTS timeline_gaps (
room_id TEXT NOT NULL,
instance_name TEXT NOT NULL,
stream_ordering BIGINT NOT NULL
);
CREATE INDEX timeline_gaps_room_id ON timeline_gaps(room_id, stream_ordering);

View File

@ -170,10 +170,10 @@ class RetryDestinationLimiter:
database in milliseconds, or zero if the last request was
successful.
backoff_on_404: Back off if we get a 404
backoff_on_failure: set to False if we should not increase the
retry interval on a failure.
notifier: A notifier used to mark servers as up.
replication_client A replication client used to mark servers as up.
backoff_on_all_error_codes: Whether we should back off on any
error code.
"""
@ -237,6 +237,9 @@ class RetryDestinationLimiter:
else:
valid_err_code = False
# Whether previous requests to the destination had been failing.
previously_failing = bool(self.failure_ts)
if success:
# We connected successfully.
if not self.retry_interval:
@ -282,6 +285,9 @@ class RetryDestinationLimiter:
if self.failure_ts is None:
self.failure_ts = retry_last_ts
# Whether the current request to the destination had been failing.
currently_failing = bool(self.failure_ts)
async def store_retry_timings() -> None:
try:
await self.store.set_destination_retry_timings(
@ -291,17 +297,15 @@ class RetryDestinationLimiter:
self.retry_interval,
)
if self.notifier:
# Inform the relevant places that the remote server is back up.
self.notifier.notify_remote_server_up(self.destination)
# If the server was previously failing, but is no longer.
if previously_failing and not currently_failing:
if self.notifier:
# Inform the relevant places that the remote server is back up.
self.notifier.notify_remote_server_up(self.destination)
if self.replication_client:
# If we're on a worker we try and inform master about this. The
# replication client doesn't hook into the notifier to avoid
# infinite loops where we send a `REMOTE_SERVER_UP` command to
# master, which then echoes it back to us which in turn pokes
# the notifier.
self.replication_client.send_remote_server_up(self.destination)
if self.replication_client:
# Inform other workers that the remote server is up.
self.replication_client.send_remote_server_up(self.destination)
except Exception:
logger.exception("Failed to store destination_retry_timings")

View File

@ -14,6 +14,8 @@
from typing import Any, List, Optional
from parameterized import parameterized
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EventTypes, Membership
@ -21,6 +23,8 @@ from synapse.events import EventBase
from synapse.replication.tcp.commands import RdataCommand
from synapse.replication.tcp.streams._base import _STREAM_UPDATE_TARGET_ROW_COUNT
from synapse.replication.tcp.streams.events import (
_MAX_STATE_UPDATES_PER_ROOM,
EventsStreamAllStateRow,
EventsStreamCurrentStateRow,
EventsStreamEventRow,
EventsStreamRow,
@ -106,11 +110,21 @@ class EventsStreamTestCase(BaseStreamTestCase):
self.assertEqual([], received_rows)
def test_update_function_huge_state_change(self) -> None:
@parameterized.expand(
[(_STREAM_UPDATE_TARGET_ROW_COUNT, False), (_MAX_STATE_UPDATES_PER_ROOM, True)]
)
def test_update_function_huge_state_change(
self, num_state_changes: int, collapse_state_changes: bool
) -> None:
"""Test replication with many state events
Ensures that all events are correctly replicated when there are lots of
state change rows to be replicated.
Args:
num_state_changes: The number of state changes to create.
collapse_state_changes: Whether the state changes are expected to be
collapsed or not.
"""
# we want to generate lots of state changes at a single stream ID.
@ -145,7 +159,7 @@ class EventsStreamTestCase(BaseStreamTestCase):
events = [
self._inject_state_event(sender=OTHER_USER)
for _ in range(_STREAM_UPDATE_TARGET_ROW_COUNT)
for _ in range(num_state_changes)
]
self.replicate()
@ -202,8 +216,7 @@ class EventsStreamTestCase(BaseStreamTestCase):
row for row in self.test_handler.received_rdata_rows if row[0] == "events"
]
# first check the first two rows, which should be state1
# first check the first two rows, which should be the state1 event.
stream_name, token, row = received_rows.pop(0)
self.assertEqual("events", stream_name)
self.assertIsInstance(row, EventsStreamRow)
@ -217,7 +230,7 @@ class EventsStreamTestCase(BaseStreamTestCase):
self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
self.assertEqual(row.data.event_id, state1.event_id)
# now the last two rows, which should be state2
# now the last two rows, which should be the state2 event.
stream_name, token, row = received_rows.pop(-2)
self.assertEqual("events", stream_name)
self.assertIsInstance(row, EventsStreamRow)
@ -231,34 +244,54 @@ class EventsStreamTestCase(BaseStreamTestCase):
self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
self.assertEqual(row.data.event_id, state2.event_id)
# that should leave us with the rows for the PL event
self.assertEqual(len(received_rows), len(events) + 2)
# Based on the number of
if collapse_state_changes:
# that should leave us with the rows for the PL event, the state changes
# get collapsed into a single row.
self.assertEqual(len(received_rows), 2)
stream_name, token, row = received_rows.pop(0)
self.assertEqual("events", stream_name)
self.assertIsInstance(row, EventsStreamRow)
self.assertEqual(row.type, "ev")
self.assertIsInstance(row.data, EventsStreamEventRow)
self.assertEqual(row.data.event_id, pl_event.event_id)
# the state rows are unsorted
state_rows: List[EventsStreamCurrentStateRow] = []
for stream_name, _, row in received_rows:
stream_name, token, row = received_rows.pop(0)
self.assertEqual("events", stream_name)
self.assertIsInstance(row, EventsStreamRow)
self.assertEqual(row.type, "state")
self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
state_rows.append(row.data)
self.assertEqual(row.type, "ev")
self.assertIsInstance(row.data, EventsStreamEventRow)
self.assertEqual(row.data.event_id, pl_event.event_id)
state_rows.sort(key=lambda r: r.state_key)
stream_name, token, row = received_rows.pop(0)
self.assertIsInstance(row, EventsStreamRow)
self.assertEqual(row.type, "state-all")
self.assertIsInstance(row.data, EventsStreamAllStateRow)
self.assertEqual(row.data.room_id, state2.room_id)
sr = state_rows.pop(0)
self.assertEqual(sr.type, EventTypes.PowerLevels)
self.assertEqual(sr.event_id, pl_event.event_id)
for sr in state_rows:
self.assertEqual(sr.type, "test_state_event")
# "None" indicates the state has been deleted
self.assertIsNone(sr.event_id)
else:
# that should leave us with the rows for the PL event
self.assertEqual(len(received_rows), len(events) + 2)
stream_name, token, row = received_rows.pop(0)
self.assertEqual("events", stream_name)
self.assertIsInstance(row, EventsStreamRow)
self.assertEqual(row.type, "ev")
self.assertIsInstance(row.data, EventsStreamEventRow)
self.assertEqual(row.data.event_id, pl_event.event_id)
# the state rows are unsorted
state_rows: List[EventsStreamCurrentStateRow] = []
for stream_name, _, row in received_rows:
self.assertEqual("events", stream_name)
self.assertIsInstance(row, EventsStreamRow)
self.assertEqual(row.type, "state")
self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
state_rows.append(row.data)
state_rows.sort(key=lambda r: r.state_key)
sr = state_rows.pop(0)
self.assertEqual(sr.type, EventTypes.PowerLevels)
self.assertEqual(sr.event_id, pl_event.event_id)
for sr in state_rows:
self.assertEqual(sr.type, "test_state_event")
# "None" indicates the state has been deleted
self.assertIsNone(sr.event_id)
def test_update_function_state_row_limit(self) -> None:
"""Test replication with many state events over several stream ids."""

View File

@ -24,7 +24,10 @@ import synapse.rest.admin
from synapse.http.site import XForwardedForRequest
from synapse.rest.client import login
from synapse.server import HomeServer
from synapse.storage.databases.main.client_ips import LAST_SEEN_GRANULARITY
from synapse.storage.databases.main.client_ips import (
LAST_SEEN_GRANULARITY,
DeviceLastConnectionInfo,
)
from synapse.types import UserID
from synapse.util import Clock
@ -65,15 +68,15 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase):
)
r = result[(user_id, device_id)]
self.assertLessEqual(
{
"user_id": user_id,
"device_id": device_id,
"ip": "ip",
"user_agent": "user_agent",
"last_seen": 12345678000,
}.items(),
r.items(),
self.assertEqual(
DeviceLastConnectionInfo(
user_id=user_id,
device_id=device_id,
ip="ip",
user_agent="user_agent",
last_seen=12345678000,
),
r,
)
def test_insert_new_client_ip_none_device_id(self) -> None:
@ -201,13 +204,13 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase):
self.assertEqual(
result,
{
(user_id, device_id): {
"user_id": user_id,
"device_id": device_id,
"ip": "ip",
"user_agent": "user_agent",
"last_seen": 12345678000,
},
(user_id, device_id): DeviceLastConnectionInfo(
user_id=user_id,
device_id=device_id,
ip="ip",
user_agent="user_agent",
last_seen=12345678000,
),
},
)
@ -292,20 +295,20 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase):
self.assertEqual(
result,
{
(user_id, device_id_1): {
"user_id": user_id,
"device_id": device_id_1,
"ip": "ip_1",
"user_agent": "user_agent_1",
"last_seen": 12345678000,
},
(user_id, device_id_2): {
"user_id": user_id,
"device_id": device_id_2,
"ip": "ip_2",
"user_agent": "user_agent_3",
"last_seen": 12345688000 + LAST_SEEN_GRANULARITY,
},
(user_id, device_id_1): DeviceLastConnectionInfo(
user_id=user_id,
device_id=device_id_1,
ip="ip_1",
user_agent="user_agent_1",
last_seen=12345678000,
),
(user_id, device_id_2): DeviceLastConnectionInfo(
user_id=user_id,
device_id=device_id_2,
ip="ip_2",
user_agent="user_agent_3",
last_seen=12345688000 + LAST_SEEN_GRANULARITY,
),
},
)
@ -526,15 +529,15 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase):
)
r = result[(user_id, device_id)]
self.assertLessEqual(
{
"user_id": user_id,
"device_id": device_id,
"ip": None,
"user_agent": None,
"last_seen": None,
}.items(),
r.items(),
self.assertEqual(
DeviceLastConnectionInfo(
user_id=user_id,
device_id=device_id,
ip=None,
user_agent=None,
last_seen=None,
),
r,
)
# Register the background update to run again.
@ -561,15 +564,15 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase):
)
r = result[(user_id, device_id)]
self.assertLessEqual(
{
"user_id": user_id,
"device_id": device_id,
"ip": "ip",
"user_agent": "user_agent",
"last_seen": 0,
}.items(),
r.items(),
self.assertEqual(
DeviceLastConnectionInfo(
user_id=user_id,
device_id=device_id,
ip="ip",
user_agent="user_agent",
last_seen=0,
),
r,
)
def test_old_user_ips_pruned(self) -> None:
@ -640,15 +643,15 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase):
)
r = result2[(user_id, device_id)]
self.assertLessEqual(
{
"user_id": user_id,
"device_id": device_id,
"ip": "ip",
"user_agent": "user_agent",
"last_seen": 0,
}.items(),
r.items(),
self.assertEqual(
DeviceLastConnectionInfo(
user_id=user_id,
device_id=device_id,
ip="ip",
user_agent="user_agent",
last_seen=0,
),
r,
)
def test_invalid_user_agents_are_ignored(self) -> None:
@ -777,13 +780,13 @@ class ClientIpAuthTestCase(unittest.HomeserverTestCase):
self.store.get_last_client_ip_by_device(self.user_id, device_id)
)
r = result[(self.user_id, device_id)]
self.assertLessEqual(
{
"user_id": self.user_id,
"device_id": device_id,
"ip": expected_ip,
"user_agent": "Mozzila pizza",
"last_seen": 123456100,
}.items(),
r.items(),
self.assertEqual(
DeviceLastConnectionInfo(
user_id=self.user_id,
device_id=device_id,
ip=expected_ip,
user_agent="Mozzila pizza",
last_seen=123456100,
),
r,
)

View File

@ -11,6 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import mock
from synapse.notifier import Notifier
from synapse.replication.tcp.handler import ReplicationCommandHandler
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from tests.unittest import HomeserverTestCase
@ -109,6 +113,77 @@ class RetryLimiterTestCase(HomeserverTestCase):
new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
self.assertIsNone(new_timings)
def test_notifier_replication(self) -> None:
"""Ensure the notifier/replication client is called only when expected."""
store = self.hs.get_datastores().main
notifier = mock.Mock(spec=Notifier)
replication_client = mock.Mock(spec=ReplicationCommandHandler)
limiter = self.get_success(
get_retry_limiter(
"test_dest",
self.clock,
store,
notifier=notifier,
replication_client=replication_client,
)
)
# The server is already up, nothing should occur.
self.pump(1)
with limiter:
pass
self.pump()
new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
self.assertIsNone(new_timings)
notifier.notify_remote_server_up.assert_not_called()
replication_client.send_remote_server_up.assert_not_called()
# Attempt again, but return an error. This will cause new retry timings, but
# should not trigger server up notifications.
self.pump(1)
try:
with limiter:
raise AssertionError("argh")
except AssertionError:
pass
self.pump()
new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
# The exact retry timings are tested separately.
self.assertIsNotNone(new_timings)
notifier.notify_remote_server_up.assert_not_called()
replication_client.send_remote_server_up.assert_not_called()
# A second failing request should be treated as the above.
self.pump(1)
try:
with limiter:
raise AssertionError("argh")
except AssertionError:
pass
self.pump()
new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
# The exact retry timings are tested separately.
self.assertIsNotNone(new_timings)
notifier.notify_remote_server_up.assert_not_called()
replication_client.send_remote_server_up.assert_not_called()
# A final successful attempt should generate a server up notification.
self.pump(1)
with limiter:
pass
self.pump()
new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
# The exact retry timings are tested separately.
self.assertIsNone(new_timings)
notifier.notify_remote_server_up.assert_called_once_with("test_dest")
replication_client.send_remote_server_up.assert_called_once_with("test_dest")
def test_max_retry_interval(self) -> None:
"""Test that `destination_max_retry_interval` setting works as expected"""
store = self.hs.get_datastores().main