Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes

anoa/redirect_instances
David Robertson 2023-01-25 15:19:17 +00:00
commit 42996efa78
No known key found for this signature in database
GPG Key ID: 903ECE108A39DEDD
29 changed files with 271 additions and 84 deletions

View File

@ -0,0 +1 @@
Faster joins: allow non-lazy-loading ("eager") syncs to complete after a partial join by omitting partial state rooms until they become fully stated.

1
changelog.d/14883.doc Normal file
View File

@ -0,0 +1 @@
Document the export user data command. Contributed by @thezaidbintariq.

1
changelog.d/14896.misc Normal file
View File

@ -0,0 +1 @@
Bump types-opentracing from 2.4.10 to 2.4.10.1.

1
changelog.d/14897.misc Normal file
View File

@ -0,0 +1 @@
Bump ruff from 0.0.224 to 0.0.230.

1
changelog.d/14899.misc Normal file
View File

@ -0,0 +1 @@
Bump types-requests from 2.28.11.7 to 2.28.11.8.

1
changelog.d/14900.misc Normal file
View File

@ -0,0 +1 @@
Bump types-psycopg2 from 2.9.21.2 to 2.9.21.4.

1
changelog.d/14901.misc Normal file
View File

@ -0,0 +1 @@
Bump types-commonmark from 0.9.2 to 0.9.2.1.

View File

@ -0,0 +1 @@
Faster joins: request partial joins by default. Admins can opt-out of this for the time being---see the upgrade notes.

1
changelog.d/14910.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a regression introduced in Synapse 1.69.0 which can result in database corruption when database migrations are interrupted on sqlite.

View File

@ -90,6 +90,19 @@ process, for example:
# Upgrading to v1.76.0 # Upgrading to v1.76.0
## Faster joins are enabled by default
When joining a room for the first time, Synapse 1.76.0rc1 will request a partial join from the other server by default. Previously, server admins had to opt-in to this using an experimental config flag.
Server admins can opt out of this feature for the time being by setting
```yaml
experimental:
faster_joins: false
```
in their server config.
## Changes to the account data replication streams ## Changes to the account data replication streams
Synapse has changed the format of the account data and devices replication Synapse has changed the format of the account data and devices replication

View File

@ -32,6 +32,14 @@ What users are registered on my server?
SELECT NAME from users; SELECT NAME from users;
``` ```
How can I export user data?
---
Synapse includes a Python command to export data for a specific user. It takes the homeserver
configuration file and the full Matrix ID of the user to export:
```console
python -m synapse.app.admin_cmd -c <config_file> export-data <user_id>
```
Manually resetting passwords Manually resetting passwords
--- ---
Users can reset their password through their client. Alternatively, a server admin Users can reset their password through their client. Alternatively, a server admin

60
poetry.lock generated
View File

@ -1906,28 +1906,28 @@ jupyter = ["ipywidgets (>=7.5.1,<8.0.0)"]
[[package]] [[package]]
name = "ruff" name = "ruff"
version = "0.0.224" version = "0.0.230"
description = "An extremely fast Python linter, written in Rust." description = "An extremely fast Python linter, written in Rust."
category = "dev" category = "dev"
optional = false optional = false
python-versions = ">=3.7" python-versions = ">=3.7"
files = [ files = [
{file = "ruff-0.0.224-py3-none-macosx_10_7_x86_64.whl", hash = "sha256:015277c45716733e99a19267fd244870bff2b619d4814065fe5cded5ab139b92"}, {file = "ruff-0.0.230-py3-none-macosx_10_7_x86_64.whl", hash = "sha256:fcc31d02cebda0a85e2e13a44642aea7f84362cb4f589e2f6b864e3928e4a7db"},
{file = "ruff-0.0.224-py3-none-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:ca8211b316fa2df70d90e38819862d58e4aec87643c2c343ba5102a7ff5d3221"}, {file = "ruff-0.0.230-py3-none-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:45a7f2c7155d520b8ca255a01235763d5c25fd5e7af055e50a78c6d91ece0ced"},
{file = "ruff-0.0.224-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:637a502a37da3aac9832a0dd21255376b0320cf66e3d420d11000e09deb3e682"}, {file = "ruff-0.0.230-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4eca8b185ab56cac67acc23287c3c8c62a0c0ffadc0787a3bef3a6e77eaed82f"},
{file = "ruff-0.0.224-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:a85fe53b8193c3e9f7ca9bef7dfd3bcd079a86542a14b66f352ce0316de5c457"}, {file = "ruff-0.0.230-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:ec2bcdb5040efd8082a3a98369eec4bdc5fd05f53cc6714cb2b725d557d4abe8"},
{file = "ruff-0.0.224-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:367c74a9ff9da165df7dc1e5e1fae5c4cda05cb94202bc9a6e5836243bc625c1"}, {file = "ruff-0.0.230-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:26571aee2b93b60e47e44478f72a9787b387f752e85b85f176739bd91b27cfd1"},
{file = "ruff-0.0.224-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:2d230fc497abdeb3b54d2808ba59802962b50e0611b558337d4c07e6d490ed6c"}, {file = "ruff-0.0.230-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:4b69c9883c3e264f8bb2d52bdabb88b8d9672750ea05f33e0ff52532824bd5c5"},
{file = "ruff-0.0.224-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:3df2bcb525fec6c2d551f7ab9843e42e8e4fa33586479953445ef64cbe6d6352"}, {file = "ruff-0.0.230-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:2b3dc88b83f200378a9b9c91036989f0285a10759514c42235ce02e5824ac8d0"},
{file = "ruff-0.0.224-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:30e494a23c2f23a07adbd4a9df526057a8cdb4c88536bbc513b5a73385c3d5a7"}, {file = "ruff-0.0.230-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:767716f008dd3a40ec2318396f648fda437c6968087a4526cde5879e382cf477"},
{file = "ruff-0.0.224-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eac5836f89f1388b7bb718a5c77cdd13e356737573d29581a18d7f575e42124c"}, {file = "ruff-0.0.230-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ac27a0f9b96d9923cef7d911790a21a19b51aec0f08375ccc47ad735b1054d78"},
{file = "ruff-0.0.224-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:d791073cfd40c8e697d4c79faa67e2ad54dc960854bfa0c0cba61ef4bb02d3b1"}, {file = "ruff-0.0.230-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:729dfc7b7ad4f7d8761dc60c58f15372d6f5c2dd9b6c5952524f2bc3aec7de6a"},
{file = "ruff-0.0.224-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:ef72a081dfe24bfb8aa0568e0f1ee0174fffbc6ebb0ae2b8b4cd3f9457cc867b"}, {file = "ruff-0.0.230-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:ad086cf2e5fef274687121f673f0f9b60c8981ec07c2bb0448c459cbaef81bcb"},
{file = "ruff-0.0.224-py3-none-musllinux_1_2_i686.whl", hash = "sha256:666ecfcf0019f5b0e72e0eba7a7330760b680ba0fb6413b139a594b117e612db"}, {file = "ruff-0.0.230-py3-none-musllinux_1_2_i686.whl", hash = "sha256:4feaed0978c24687133cd11c7380de20aa841f893e24430c735cc6c3faba4837"},
{file = "ruff-0.0.224-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:815d5d448e2bf5107340d6f47cbddac74186cb365c56bdcc2315fbcf59ebc466"}, {file = "ruff-0.0.230-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:1d1046d0d43a0f24b2e9e61d76bb201b486ad02e9787d3432af43bd7d16f2c2e"},
{file = "ruff-0.0.224-py3-none-win32.whl", hash = "sha256:17d98c1f03e98c15d3f2c49e0ffdedc57b221217c4e3d7b6f732893101083240"}, {file = "ruff-0.0.230-py3-none-win32.whl", hash = "sha256:4d627911c9ba57bcd2f2776f1c09a10d334db163cb5be8c892e7ec7b59ccf58c"},
{file = "ruff-0.0.224-py3-none-win_amd64.whl", hash = "sha256:3db4fe992cea69405061e09974c3955b750611b1e76161471c27cd2e8ccffa05"}, {file = "ruff-0.0.230-py3-none-win_amd64.whl", hash = "sha256:27fd4891a1d0642f5b2038ebf86f8169bc3d466964bdfaa0ce2a65149bc7cced"},
{file = "ruff-0.0.224.tar.gz", hash = "sha256:3b07c2e8da29605a8577b1aef90f8ca0c34a66683b77b06007f1970bc0689003"}, {file = "ruff-0.0.230.tar.gz", hash = "sha256:a049f93af1057ac450e8c09559d44e371eda1c151b1b863c0013a1066fefddb0"},
] ]
[[package]] [[package]]
@ -2494,14 +2494,14 @@ files = [
[[package]] [[package]]
name = "types-commonmark" name = "types-commonmark"
version = "0.9.2" version = "0.9.2.1"
description = "Typing stubs for commonmark" description = "Typing stubs for commonmark"
category = "dev" category = "dev"
optional = false optional = false
python-versions = "*" python-versions = "*"
files = [ files = [
{file = "types-commonmark-0.9.2.tar.gz", hash = "sha256:b894b67750c52fd5abc9a40a9ceb9da4652a391d75c1b480bba9cef90f19fc86"}, {file = "types-commonmark-0.9.2.1.tar.gz", hash = "sha256:db8277e6aeb83429265eccece98a24954a9a502dde7bc7cf840a8741abd96b86"},
{file = "types_commonmark-0.9.2-py3-none-any.whl", hash = "sha256:56f20199a1f9a2924443211a0ef97f8b15a8a956a7f4e9186be6950bf38d6d02"}, {file = "types_commonmark-0.9.2.1-py3-none-any.whl", hash = "sha256:9d5f500cb7eced801bde728137b0a10667bd853d328db641d03141f189e3aab4"},
] ]
[[package]] [[package]]
@ -2570,14 +2570,14 @@ files = [
[[package]] [[package]]
name = "types-opentracing" name = "types-opentracing"
version = "2.4.10" version = "2.4.10.1"
description = "Typing stubs for opentracing" description = "Typing stubs for opentracing"
category = "dev" category = "dev"
optional = false optional = false
python-versions = "*" python-versions = "*"
files = [ files = [
{file = "types-opentracing-2.4.10.tar.gz", hash = "sha256:6101414f3b6d3b9c10f1c510a261e8439b6c8d67c723d5c2872084697b4580a7"}, {file = "types-opentracing-2.4.10.1.tar.gz", hash = "sha256:49e7e52b8b6e221865a9201fc8c2df0bcda8e7098d4ebb35903dbfa4b4d29195"},
{file = "types_opentracing-2.4.10-py3-none-any.whl", hash = "sha256:66d9cfbbdc4a6f8ca8189a15ad26f0fe41cee84c07057759c5d194e2505b84c2"}, {file = "types_opentracing-2.4.10.1-py3-none-any.whl", hash = "sha256:eb63394acd793e7d9e327956242349fee14580a87c025408dc268d4dd883cc24"},
] ]
[[package]] [[package]]
@ -2594,14 +2594,14 @@ files = [
[[package]] [[package]]
name = "types-psycopg2" name = "types-psycopg2"
version = "2.9.21.2" version = "2.9.21.4"
description = "Typing stubs for psycopg2" description = "Typing stubs for psycopg2"
category = "dev" category = "dev"
optional = false optional = false
python-versions = "*" python-versions = "*"
files = [ files = [
{file = "types-psycopg2-2.9.21.2.tar.gz", hash = "sha256:bff045579642ce00b4a3c8f2e401b7f96dfaa34939f10be64b0dd3b53feca57d"}, {file = "types-psycopg2-2.9.21.4.tar.gz", hash = "sha256:d43dda166a70d073ddac40718e06539836b5844c99b58ef8d4489a8df2edf5c0"},
{file = "types_psycopg2-2.9.21.2-py3-none-any.whl", hash = "sha256:084558d6bc4b2cfa249b06be0fdd9a14a69d307bae5bb5809a2f14cfbaa7a23f"}, {file = "types_psycopg2-2.9.21.4-py3-none-any.whl", hash = "sha256:6a05dca0856996aa37d7abe436751803bf47ec006cabbefea092e057f23bc95d"},
] ]
[[package]] [[package]]
@ -2633,14 +2633,14 @@ files = [
[[package]] [[package]]
name = "types-requests" name = "types-requests"
version = "2.28.11.7" version = "2.28.11.8"
description = "Typing stubs for requests" description = "Typing stubs for requests"
category = "dev" category = "dev"
optional = false optional = false
python-versions = "*" python-versions = "*"
files = [ files = [
{file = "types-requests-2.28.11.7.tar.gz", hash = "sha256:0ae38633734990d019b80f5463dfa164ebd3581998ac8435f526da6fe4d598c3"}, {file = "types-requests-2.28.11.8.tar.gz", hash = "sha256:e67424525f84adfbeab7268a159d3c633862dafae15c5b19547ce1b55954f0a3"},
{file = "types_requests-2.28.11.7-py3-none-any.whl", hash = "sha256:b6a2fca8109f4fdba33052f11ed86102bddb2338519e1827387137fefc66a98b"}, {file = "types_requests-2.28.11.8-py3-none-any.whl", hash = "sha256:61960554baca0008ae7e2db2bd3b322ca9a144d3e80ce270f5fb640817e40994"},
] ]
[package.dependencies] [package.dependencies]
@ -2964,4 +2964,4 @@ user-search = ["pyicu"]
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = "^3.7.1" python-versions = "^3.7.1"
content-hash = "38867861f77c6faca817487efd02fbb7271b424d56744ad9ad248cd1dd297566" content-hash = "2673ef0530a42dae1df998bacfcaf88a563529b39461003a980743a97f02996f"

View File

@ -309,7 +309,7 @@ all = [
# We pin black so that our tests don't start failing on new releases. # We pin black so that our tests don't start failing on new releases.
isort = ">=5.10.1" isort = ">=5.10.1"
black = ">=22.3.0" black = ">=22.3.0"
ruff = "0.0.224" ruff = "0.0.230"
# Typechecking # Typechecking
mypy = "*" mypy = "*"

View File

@ -84,7 +84,7 @@ class ExperimentalConfig(Config):
# experimental support for faster joins over federation # experimental support for faster joins over federation
# (MSC2775, MSC3706, MSC3895) # (MSC2775, MSC3706, MSC3895)
# requires a target server that can provide a partial join response (MSC3706) # requires a target server that can provide a partial join response (MSC3706)
self.faster_joins_enabled: bool = experimental.get("faster_joins", False) self.faster_joins_enabled: bool = experimental.get("faster_joins", True)
# MSC3720 (Account status endpoint) # MSC3720 (Account status endpoint)
self.msc3720_enabled: bool = experimental.get("msc3720_enabled", False) self.msc3720_enabled: bool = experimental.get("msc3720_enabled", False)

View File

@ -1868,22 +1868,17 @@ class FederationHandler:
async with self._is_partial_state_room_linearizer.queue(room_id): async with self._is_partial_state_room_linearizer.queue(room_id):
logger.info("Clearing partial-state flag for %s", room_id) logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id) new_stream_id = await self.store.clear_partial_state_room(room_id)
# Poke the notifier so that other workers see the write to if new_stream_id is not None:
# the un-partial-stated rooms stream.
self._notifier.notify_replication()
if success:
logger.info("State resync complete for %s", room_id) logger.info("State resync complete for %s", room_id)
self._storage_controllers.state.notify_room_un_partial_stated( self._storage_controllers.state.notify_room_un_partial_stated(
room_id room_id
) )
# Poke the notifier so that other workers see the write to await self._notifier.on_un_partial_stated_room(
# the un-partial-stated rooms stream. room_id, new_stream_id
self._notifier.notify_replication() )
return return
# we raced against more events arriving with partial state. Go round # we raced against more events arriving with partial state. Go round

View File

@ -290,7 +290,7 @@ class SyncHandler:
expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
) )
self.rooms_to_exclude = hs.config.server.rooms_to_exclude_from_sync self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
async def wait_for_sync_for_user( async def wait_for_sync_for_user(
self, self,
@ -1340,7 +1340,10 @@ class SyncHandler:
membership_change_events = [] membership_change_events = []
if since_token: if since_token:
membership_change_events = await self.store.get_membership_changes_for_user( membership_change_events = await self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude user_id,
since_token.room_key,
now_token.room_key,
self.rooms_to_exclude_globally,
) )
mem_last_change_by_room_id: Dict[str, EventBase] = {} mem_last_change_by_room_id: Dict[str, EventBase] = {}
@ -1375,12 +1378,39 @@ class SyncHandler:
else: else:
mutable_joined_room_ids.discard(room_id) mutable_joined_room_ids.discard(room_id)
# Tweak the set of rooms to return to the client for eager (non-lazy) syncs.
mutable_rooms_to_exclude = set(self.rooms_to_exclude_globally)
if not sync_config.filter_collection.lazy_load_members():
# Non-lazy syncs should never include partially stated rooms.
# Exclude all partially stated rooms from this sync.
for room_id in mutable_joined_room_ids:
if await self.store.is_partial_state_room(room_id):
mutable_rooms_to_exclude.add(room_id)
# Incremental eager syncs should additionally include rooms that
# - we are joined to
# - are full-stated
# - became fully-stated at some point during the sync period
# (These rooms will have been omitted during a previous eager sync.)
forced_newly_joined_room_ids = set()
if since_token and not sync_config.filter_collection.lazy_load_members():
un_partial_stated_rooms = (
await self.store.get_un_partial_stated_rooms_between(
since_token.un_partial_stated_rooms_key,
now_token.un_partial_stated_rooms_key,
mutable_joined_room_ids,
)
)
for room_id in un_partial_stated_rooms:
if not await self.store.is_partial_state_room(room_id):
forced_newly_joined_room_ids.add(room_id)
# Now we have our list of joined room IDs, exclude as configured and freeze # Now we have our list of joined room IDs, exclude as configured and freeze
joined_room_ids = frozenset( joined_room_ids = frozenset(
( (
room_id room_id
for room_id in mutable_joined_room_ids for room_id in mutable_joined_room_ids
if room_id not in self.rooms_to_exclude if room_id not in mutable_rooms_to_exclude
) )
) )
@ -1397,6 +1427,8 @@ class SyncHandler:
since_token=since_token, since_token=since_token,
now_token=now_token, now_token=now_token,
joined_room_ids=joined_room_ids, joined_room_ids=joined_room_ids,
excluded_room_ids=frozenset(mutable_rooms_to_exclude),
forced_newly_joined_room_ids=frozenset(forced_newly_joined_room_ids),
membership_change_events=membership_change_events, membership_change_events=membership_change_events,
) )
@ -1834,14 +1866,16 @@ class SyncHandler:
# 3. Work out which rooms need reporting in the sync response. # 3. Work out which rooms need reporting in the sync response.
ignored_users = await self.store.ignored_users(user_id) ignored_users = await self.store.ignored_users(user_id)
if since_token: if since_token:
room_changes = await self._get_rooms_changed( room_changes = await self._get_room_changes_for_incremental_sync(
sync_result_builder, ignored_users sync_result_builder, ignored_users
) )
tags_by_room = await self.store.get_updated_tags( tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key user_id, since_token.account_data_key
) )
else: else:
room_changes = await self._get_all_rooms(sync_result_builder, ignored_users) room_changes = await self._get_room_changes_for_initial_sync(
sync_result_builder, ignored_users
)
tags_by_room = await self.store.get_tags_for_user(user_id) tags_by_room = await self.store.get_tags_for_user(user_id)
log_kv({"rooms_changed": len(room_changes.room_entries)}) log_kv({"rooms_changed": len(room_changes.room_entries)})
@ -1900,7 +1934,7 @@ class SyncHandler:
assert since_token assert since_token
if membership_change_events: if membership_change_events or sync_result_builder.forced_newly_joined_room_ids:
return True return True
stream_id = since_token.room_key.stream stream_id = since_token.room_key.stream
@ -1909,7 +1943,7 @@ class SyncHandler:
return True return True
return False return False
async def _get_rooms_changed( async def _get_room_changes_for_incremental_sync(
self, self,
sync_result_builder: "SyncResultBuilder", sync_result_builder: "SyncResultBuilder",
ignored_users: FrozenSet[str], ignored_users: FrozenSet[str],
@ -1947,7 +1981,9 @@ class SyncHandler:
for event in membership_change_events: for event in membership_change_events:
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
newly_joined_rooms: List[str] = [] newly_joined_rooms: List[str] = list(
sync_result_builder.forced_newly_joined_room_ids
)
newly_left_rooms: List[str] = [] newly_left_rooms: List[str] = []
room_entries: List[RoomSyncResultBuilder] = [] room_entries: List[RoomSyncResultBuilder] = []
invited: List[InvitedSyncResult] = [] invited: List[InvitedSyncResult] = []
@ -2153,7 +2189,7 @@ class SyncHandler:
newly_left_rooms, newly_left_rooms,
) )
async def _get_all_rooms( async def _get_room_changes_for_initial_sync(
self, self,
sync_result_builder: "SyncResultBuilder", sync_result_builder: "SyncResultBuilder",
ignored_users: FrozenSet[str], ignored_users: FrozenSet[str],
@ -2178,7 +2214,7 @@ class SyncHandler:
room_list = await self.store.get_rooms_for_local_user_where_membership_is( room_list = await self.store.get_rooms_for_local_user_where_membership_is(
user_id=user_id, user_id=user_id,
membership_list=Membership.LIST, membership_list=Membership.LIST,
excluded_rooms=self.rooms_to_exclude, excluded_rooms=sync_result_builder.excluded_room_ids,
) )
room_entries = [] room_entries = []
@ -2549,6 +2585,13 @@ class SyncResultBuilder:
since_token: The token supplied by user, or None. since_token: The token supplied by user, or None.
now_token: The token to sync up to. now_token: The token to sync up to.
joined_room_ids: List of rooms the user is joined to joined_room_ids: List of rooms the user is joined to
excluded_room_ids: Set of room ids we should omit from the /sync response.
forced_newly_joined_room_ids:
Rooms that should be presented in the /sync response as if they were
newly joined during the sync period, even if that's not the case.
(This is useful if the room was previously excluded from a /sync response,
and now the client should be made aware of it.)
Only used by incremental syncs.
# The following mirror the fields in a sync response # The following mirror the fields in a sync response
presence presence
@ -2565,6 +2608,8 @@ class SyncResultBuilder:
since_token: Optional[StreamToken] since_token: Optional[StreamToken]
now_token: StreamToken now_token: StreamToken
joined_room_ids: FrozenSet[str] joined_room_ids: FrozenSet[str]
excluded_room_ids: FrozenSet[str]
forced_newly_joined_room_ids: FrozenSet[str]
membership_change_events: List[EventBase] membership_change_events: List[EventBase]
presence: List[UserPresenceState] = attr.Factory(list) presence: List[UserPresenceState] = attr.Factory(list)

View File

@ -314,6 +314,32 @@ class Notifier:
event_entries.append((entry, event.event_id)) event_entries.append((entry, event.event_id))
await self.notify_new_room_events(event_entries, max_room_stream_token) await self.notify_new_room_events(event_entries, max_room_stream_token)
async def on_un_partial_stated_room(
self,
room_id: str,
new_token: int,
) -> None:
"""Used by the resync background processes to wake up all listeners
of this room when it is un-partial-stated.
It will also notify replication listeners of the change in stream.
"""
# Wake up all related user stream notifiers
user_streams = self.room_to_user_streams.get(room_id, set())
time_now_ms = self.clock.time_msec()
for user_stream in user_streams:
try:
user_stream.notify(
StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
)
except Exception:
logger.exception("Failed to notify listener")
# Poke the replication so that other workers also see the write to
# the un-partial-stated rooms stream.
self.notify_replication()
async def notify_new_room_events( async def notify_new_room_events(
self, self,
event_entries: List[Tuple[_PendingRoomEventEntry, str]], event_entries: List[Tuple[_PendingRoomEventEntry, str]],

View File

@ -260,6 +260,7 @@ class ReplicationDataHandler:
self._state_storage_controller.notify_room_un_partial_stated( self._state_storage_controller.notify_room_un_partial_stated(
row.room_id row.room_id
) )
await self.notifier.on_un_partial_stated_room(row.room_id, token)
elif stream_name == UnPartialStatedEventStream.NAME: elif stream_name == UnPartialStatedEventStream.NAME:
for row in rows: for row in rows:
assert isinstance(row, UnPartialStatedEventStreamRow) assert isinstance(row, UnPartialStatedEventStreamRow)

View File

@ -292,6 +292,7 @@ class RelationsWorkerStore(SQLBaseStore):
to_device_key=0, to_device_key=0,
device_list_key=0, device_list_key=0,
groups_key=0, groups_key=0,
un_partial_stated_rooms_key=0,
) )
return events[:limit], next_token return events[:limit], next_token

View File

@ -26,6 +26,7 @@ from typing import (
Mapping, Mapping,
Optional, Optional,
Sequence, Sequence,
Set,
Tuple, Tuple,
Union, Union,
cast, cast,
@ -1294,10 +1295,44 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
instance_name instance_name
) )
async def get_un_partial_stated_rooms_between(
self, last_id: int, current_id: int, room_ids: Collection[str]
) -> Set[str]:
"""Get all rooms that got un partial stated between `last_id` exclusive and
`current_id` inclusive.
Returns:
The list of room ids.
"""
if last_id == current_id:
return set()
def _get_un_partial_stated_rooms_between_txn(
txn: LoggingTransaction,
) -> Set[str]:
sql = """
SELECT DISTINCT room_id FROM un_partial_stated_room_stream
WHERE ? < stream_id AND stream_id <= ? AND
"""
clause, args = make_in_list_sql_clause(
self.database_engine, "room_id", room_ids
)
txn.execute(sql + clause, [last_id, current_id] + args)
return {r[0] for r in txn}
return await self.db_pool.runInteraction(
"get_un_partial_stated_rooms_between",
_get_un_partial_stated_rooms_between_txn,
)
async def get_un_partial_stated_rooms_from_stream( async def get_un_partial_stated_rooms_from_stream(
self, instance_name: str, last_id: int, current_id: int, limit: int self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]: ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
"""Get updates for caches replication stream. """Get updates for un partial stated rooms replication stream.
Args: Args:
instance_name: The writer we want to fetch updates from. Unused instance_name: The writer we want to fetch updates from. Unused
@ -2304,16 +2339,16 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
(room_id,), (room_id,),
) )
async def clear_partial_state_room(self, room_id: str) -> bool: async def clear_partial_state_room(self, room_id: str) -> Optional[int]:
"""Clears the partial state flag for a room. """Clears the partial state flag for a room.
Args: Args:
room_id: The room whose partial state flag is to be cleared. room_id: The room whose partial state flag is to be cleared.
Returns: Returns:
`True` if the partial state flag has been cleared successfully. The corresponding stream id for the un-partial-stated rooms stream.
`False` if the partial state flag could not be cleared because the room `None` if the partial state flag could not be cleared because the room
still contains events with partial state. still contains events with partial state.
""" """
try: try:
@ -2324,7 +2359,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
room_id, room_id,
un_partial_state_room_stream_id, un_partial_state_room_stream_id,
) )
return True return un_partial_state_room_stream_id
except self.db_pool.engine.module.IntegrityError as e: except self.db_pool.engine.module.IntegrityError as e:
# Assume that any `IntegrityError`s are due to partial state events. # Assume that any `IntegrityError`s are due to partial state events.
logger.info( logger.info(
@ -2332,7 +2367,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
room_id, room_id,
e, e,
) )
return False return None
def _clear_partial_state_room_txn( def _clear_partial_state_room_txn(
self, self,

View File

@ -15,6 +15,7 @@
import logging import logging
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
AbstractSet,
Collection, Collection,
Dict, Dict,
FrozenSet, FrozenSet,
@ -47,7 +48,13 @@ from synapse.storage.roommember import (
ProfileInfo, ProfileInfo,
RoomsForUser, RoomsForUser,
) )
from synapse.types import JsonDict, PersistedEventPosition, StateMap, get_domain_from_id from synapse.types import (
JsonDict,
PersistedEventPosition,
StateMap,
StrCollection,
get_domain_from_id,
)
from synapse.util.async_helpers import Linearizer from synapse.util.async_helpers import Linearizer
from synapse.util.caches import intern_string from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
@ -385,7 +392,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
self, self,
user_id: str, user_id: str,
membership_list: Collection[str], membership_list: Collection[str],
excluded_rooms: Optional[List[str]] = None, excluded_rooms: StrCollection = (),
) -> List[RoomsForUser]: ) -> List[RoomsForUser]:
"""Get all the rooms for this *local* user where the membership for this user """Get all the rooms for this *local* user where the membership for this user
matches one in the membership list. matches one in the membership list.
@ -412,10 +419,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
) )
# Now we filter out forgotten and excluded rooms # Now we filter out forgotten and excluded rooms
rooms_to_exclude: Set[str] = await self.get_forgotten_rooms_for_user(user_id) rooms_to_exclude = await self.get_forgotten_rooms_for_user(user_id)
if excluded_rooms is not None: if excluded_rooms is not None:
rooms_to_exclude.update(set(excluded_rooms)) # Take a copy to avoid mutating the in-cache set
rooms_to_exclude = set(rooms_to_exclude)
rooms_to_exclude.update(excluded_rooms)
return [room for room in rooms if room.room_id not in rooms_to_exclude] return [room for room in rooms if room.room_id not in rooms_to_exclude]
@ -1169,7 +1178,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return count == 0 return count == 0
@cached() @cached()
async def get_forgotten_rooms_for_user(self, user_id: str) -> Set[str]: async def get_forgotten_rooms_for_user(self, user_id: str) -> AbstractSet[str]:
"""Gets all rooms the user has forgotten. """Gets all rooms the user has forgotten.
Args: Args:

View File

@ -67,7 +67,7 @@ from synapse.storage.database import (
make_in_list_sql_clause, make_in_list_sql_clause,
) )
from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import PersistedEventPosition, RoomStreamToken from synapse.types import PersistedEventPosition, RoomStreamToken
from synapse.util.caches.descriptors import cached from synapse.util.caches.descriptors import cached
@ -944,12 +944,40 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
room_id room_id
stream_key stream_key
""" """
sql = ( if isinstance(self.database_engine, PostgresEngine):
"SELECT coalesce(MIN(topological_ordering), 0) FROM events" min_function = "LEAST"
" WHERE room_id = ? AND stream_ordering >= ?" elif isinstance(self.database_engine, Sqlite3Engine):
) min_function = "MIN"
else:
raise RuntimeError(f"Unknown database engine {self.database_engine}")
# This query used to be
# SELECT COALESCE(MIN(topological_ordering), 0) FROM events
# WHERE room_id = ? and events.stream_ordering >= {stream_key}
# which returns 0 if the stream_key is newer than any event in
# the room. That's not wrong, but it seems to interact oddly with backfill,
# requiring a second call to /messages to actually backfill from a remote
# homeserver.
#
# Instead, rollback the stream ordering to that after the most recent event in
# this room.
sql = f"""
WITH fallback(max_stream_ordering) AS (
SELECT MAX(stream_ordering)
FROM events
WHERE room_id = ?
)
SELECT COALESCE(MIN(topological_ordering), 0) FROM events
WHERE
room_id = ?
AND events.stream_ordering >= {min_function}(
?,
(SELECT max_stream_ordering FROM fallback)
)
"""
row = await self.db_pool.execute( row = await self.db_pool.execute(
"get_current_topological_token", None, sql, room_id, stream_key "get_current_topological_token", None, sql, room_id, room_id, stream_key
) )
return row[0][0] if row else 0 return row[0][0] if row else 0

View File

@ -132,6 +132,9 @@ class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCM
"""Execute a chunk of SQL containing multiple semicolon-delimited statements. """Execute a chunk of SQL containing multiple semicolon-delimited statements.
This is not provided by DBAPI2, and so needs engine-specific support. This is not provided by DBAPI2, and so needs engine-specific support.
Some database engines may automatically COMMIT the ongoing transaction both
before and after executing the script.
""" """
... ...

View File

@ -135,13 +135,14 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]):
> than one statement with it, it will raise a Warning. Use executescript() if > than one statement with it, it will raise a Warning. Use executescript() if
> you want to execute multiple SQL statements with one call. > you want to execute multiple SQL statements with one call.
Though the docs for `executescript` warn: The script is wrapped in transaction control statemnets, since the docs for
`executescript` warn:
> If there is a pending transaction, an implicit COMMIT statement is executed > If there is a pending transaction, an implicit COMMIT statement is executed
> first. No other implicit transaction control is performed; any transaction > first. No other implicit transaction control is performed; any transaction
> control must be added to sql_script. > control must be added to sql_script.
""" """
cursor.executescript(script) cursor.executescript(f"BEGIN TRANSACTION;\n{script}\nCOMMIT;")
# Following functions taken from: https://github.com/coleifer/peewee # Following functions taken from: https://github.com/coleifer/peewee

View File

@ -53,11 +53,15 @@ class EventSources:
*(attribute.type(hs) for attribute in attr.fields(_EventSourcesInner)) *(attribute.type(hs) for attribute in attr.fields(_EventSourcesInner))
) )
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self._instance_name = hs.get_instance_name()
def get_current_token(self) -> StreamToken: def get_current_token(self) -> StreamToken:
push_rules_key = self.store.get_max_push_rules_stream_id() push_rules_key = self.store.get_max_push_rules_stream_id()
to_device_key = self.store.get_to_device_stream_token() to_device_key = self.store.get_to_device_stream_token()
device_list_key = self.store.get_device_stream_token() device_list_key = self.store.get_device_stream_token()
un_partial_stated_rooms_key = self.store.get_un_partial_stated_rooms_token(
self._instance_name
)
token = StreamToken( token = StreamToken(
room_key=self.sources.room.get_current_key(), room_key=self.sources.room.get_current_key(),
@ -70,6 +74,7 @@ class EventSources:
device_list_key=device_list_key, device_list_key=device_list_key,
# Groups key is unused. # Groups key is unused.
groups_key=0, groups_key=0,
un_partial_stated_rooms_key=un_partial_stated_rooms_key,
) )
return token return token
@ -107,5 +112,6 @@ class EventSources:
to_device_key=0, to_device_key=0,
device_list_key=0, device_list_key=0,
groups_key=0, groups_key=0,
un_partial_stated_rooms_key=0,
) )
return token return token

View File

@ -17,6 +17,7 @@ import re
import string import string
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
AbstractSet,
Any, Any,
ClassVar, ClassVar,
Dict, Dict,
@ -79,7 +80,7 @@ JsonSerializable = object
# Collection[str] that does not include str itself; str being a Sequence[str] # Collection[str] that does not include str itself; str being a Sequence[str]
# is very misleading and results in bugs. # is very misleading and results in bugs.
StrCollection = Union[Tuple[str, ...], List[str], Set[str]] StrCollection = Union[Tuple[str, ...], List[str], AbstractSet[str]]
# Note that this seems to require inheriting *directly* from Interface in order # Note that this seems to require inheriting *directly* from Interface in order
@ -633,6 +634,7 @@ class StreamKeyType:
PUSH_RULES: Final = "push_rules_key" PUSH_RULES: Final = "push_rules_key"
TO_DEVICE: Final = "to_device_key" TO_DEVICE: Final = "to_device_key"
DEVICE_LIST: Final = "device_list_key" DEVICE_LIST: Final = "device_list_key"
UN_PARTIAL_STATED_ROOMS = "un_partial_stated_rooms_key"
@attr.s(slots=True, frozen=True, auto_attribs=True) @attr.s(slots=True, frozen=True, auto_attribs=True)
@ -640,7 +642,7 @@ class StreamToken:
"""A collection of keys joined together by underscores in the following """A collection of keys joined together by underscores in the following
order and which represent the position in their respective streams. order and which represent the position in their respective streams.
ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1` ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1_379`
1. `room_key`: `s2633508` which is a `RoomStreamToken` 1. `room_key`: `s2633508` which is a `RoomStreamToken`
- `RoomStreamToken`'s can also look like `t426-2633508` or `m56~2.58~3.59` - `RoomStreamToken`'s can also look like `t426-2633508` or `m56~2.58~3.59`
- See the docstring for `RoomStreamToken` for more details. - See the docstring for `RoomStreamToken` for more details.
@ -652,12 +654,13 @@ class StreamToken:
7. `to_device_key`: `274711` 7. `to_device_key`: `274711`
8. `device_list_key`: `265584` 8. `device_list_key`: `265584`
9. `groups_key`: `1` (note that this key is now unused) 9. `groups_key`: `1` (note that this key is now unused)
10. `un_partial_stated_rooms_key`: `379`
You can see how many of these keys correspond to the various You can see how many of these keys correspond to the various
fields in a "/sync" response: fields in a "/sync" response:
```json ```json
{ {
"next_batch": "s12_4_0_1_1_1_1_4_1", "next_batch": "s12_4_0_1_1_1_1_4_1_1",
"presence": { "presence": {
"events": [] "events": []
}, },
@ -669,7 +672,7 @@ class StreamToken:
"!QrZlfIDQLNLdZHqTnt:hs1": { "!QrZlfIDQLNLdZHqTnt:hs1": {
"timeline": { "timeline": {
"events": [], "events": [],
"prev_batch": "s10_4_0_1_1_1_1_4_1", "prev_batch": "s10_4_0_1_1_1_1_4_1_1",
"limited": false "limited": false
}, },
"state": { "state": {
@ -705,6 +708,7 @@ class StreamToken:
device_list_key: int device_list_key: int
# Note that the groups key is no longer used and may have bogus values. # Note that the groups key is no longer used and may have bogus values.
groups_key: int groups_key: int
un_partial_stated_rooms_key: int
_SEPARATOR = "_" _SEPARATOR = "_"
START: ClassVar["StreamToken"] START: ClassVar["StreamToken"]
@ -743,6 +747,7 @@ class StreamToken:
# serialized so that there will not be confusion in the future # serialized so that there will not be confusion in the future
# if additional tokens are added. # if additional tokens are added.
str(self.groups_key), str(self.groups_key),
str(self.un_partial_stated_rooms_key),
] ]
) )
@ -775,7 +780,7 @@ class StreamToken:
return attr.evolve(self, **{key: new_value}) return attr.evolve(self, **{key: new_value})
StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0) StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0, 0)
@attr.s(slots=True, frozen=True, auto_attribs=True) @attr.s(slots=True, frozen=True, auto_attribs=True)

View File

@ -1831,7 +1831,7 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
def test_topo_token_is_accepted(self) -> None: def test_topo_token_is_accepted(self) -> None:
"""Test Topo Token is accepted.""" """Test Topo Token is accepted."""
token = "t1-0_0_0_0_0_0_0_0_0" token = "t1-0_0_0_0_0_0_0_0_0_0"
channel = self.make_request( channel = self.make_request(
"GET", "GET",
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token), "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
@ -1845,7 +1845,7 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None: def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
"""Test that stream token is accepted for forward pagination.""" """Test that stream token is accepted for forward pagination."""
token = "s0_0_0_0_0_0_0_0_0" token = "s0_0_0_0_0_0_0_0_0_0"
channel = self.make_request( channel = self.make_request(
"GET", "GET",
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token), "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),

View File

@ -1987,7 +1987,7 @@ class RoomMessageListTestCase(RoomBase):
self.room_id = self.helper.create_room_as(self.user_id) self.room_id = self.helper.create_room_as(self.user_id)
def test_topo_token_is_accepted(self) -> None: def test_topo_token_is_accepted(self) -> None:
token = "t1-0_0_0_0_0_0_0_0_0" token = "t1-0_0_0_0_0_0_0_0_0_0"
channel = self.make_request( channel = self.make_request(
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token) "GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
) )
@ -1998,7 +1998,7 @@ class RoomMessageListTestCase(RoomBase):
self.assertTrue("end" in channel.json_body) self.assertTrue("end" in channel.json_body)
def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None: def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
token = "s0_0_0_0_0_0_0_0_0" token = "s0_0_0_0_0_0_0_0_0_0"
channel = self.make_request( channel = self.make_request(
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token) "GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
) )
@ -2728,7 +2728,7 @@ class LabelsTestCase(unittest.HomeserverTestCase):
"""Test that we can filter by a label on a /messages request.""" """Test that we can filter by a label on a /messages request."""
self._send_labelled_messages_in_room() self._send_labelled_messages_in_room()
token = "s0_0_0_0_0_0_0_0_0" token = "s0_0_0_0_0_0_0_0_0_0"
channel = self.make_request( channel = self.make_request(
"GET", "GET",
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s" "/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
@ -2745,7 +2745,7 @@ class LabelsTestCase(unittest.HomeserverTestCase):
"""Test that we can filter by the absence of a label on a /messages request.""" """Test that we can filter by the absence of a label on a /messages request."""
self._send_labelled_messages_in_room() self._send_labelled_messages_in_room()
token = "s0_0_0_0_0_0_0_0_0" token = "s0_0_0_0_0_0_0_0_0_0"
channel = self.make_request( channel = self.make_request(
"GET", "GET",
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s" "/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
@ -2768,7 +2768,7 @@ class LabelsTestCase(unittest.HomeserverTestCase):
""" """
self._send_labelled_messages_in_room() self._send_labelled_messages_in_room()
token = "s0_0_0_0_0_0_0_0_0" token = "s0_0_0_0_0_0_0_0_0_0"
channel = self.make_request( channel = self.make_request(
"GET", "GET",
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s" "/rooms/%s/messages?access_token=%s&from=%s&filter=%s"

View File

@ -913,7 +913,9 @@ class ExcludeRoomTestCase(unittest.HomeserverTestCase):
# We need to manually append the room ID, because we can't know the ID before # We need to manually append the room ID, because we can't know the ID before
# creating the room, and we can't set the config after starting the homeserver. # creating the room, and we can't set the config after starting the homeserver.
self.hs.get_sync_handler().rooms_to_exclude.append(self.excluded_room_id) self.hs.get_sync_handler().rooms_to_exclude_globally.append(
self.excluded_room_id
)
def test_join_leave(self) -> None: def test_join_leave(self) -> None:
"""Tests that rooms are correctly excluded from the 'join' and 'leave' sections of """Tests that rooms are correctly excluded from the 'join' and 'leave' sections of