Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
commit
b7d0c7d3fd
|
@ -0,0 +1 @@
|
||||||
|
Fix a rare race that could block new events from being sent for up to two minutes. Introduced in v1.90.0.
|
|
@ -1 +1 @@
|
||||||
Clean-up calling `setup_background_tasks` in unit tests.
|
Improve presence tests.
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Improve presence tests.
|
|
@ -0,0 +1 @@
|
||||||
|
Reduce DB contention on worker locks.
|
|
@ -0,0 +1 @@
|
||||||
|
Fix a bug introduced in 1.87 where synapse would send an excessive amount of federation requests to servers which have been offline for a long time. Contributed by Nico.
|
|
@ -0,0 +1 @@
|
||||||
|
Task scheduler: mark task as active if we are scheduling as soon as possible.
|
|
@ -92,7 +92,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
|
||||||
if clauses:
|
if clauses:
|
||||||
sql = sql + " WHERE " + " AND ".join(clauses)
|
sql = sql + " WHERE " + " AND ".join(clauses)
|
||||||
|
|
||||||
sql = sql + "ORDER BY timestamp"
|
sql = sql + " ORDER BY timestamp"
|
||||||
|
|
||||||
txn.execute(sql, args)
|
txn.execute(sql, args)
|
||||||
return self.db_pool.cursor_to_dict(txn)
|
return self.db_pool.cursor_to_dict(txn)
|
||||||
|
|
|
@ -242,8 +242,6 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
|
||||||
) -> None:
|
) -> None:
|
||||||
# Upsert retry time interval if retry_interval is zero (i.e. we're
|
# Upsert retry time interval if retry_interval is zero (i.e. we're
|
||||||
# resetting it) or greater than the existing retry interval.
|
# resetting it) or greater than the existing retry interval.
|
||||||
# We also upsert when the new retry interval is the same as the existing one,
|
|
||||||
# since it will be the case when `destination_max_retry_interval` is reached.
|
|
||||||
#
|
#
|
||||||
# WARNING: This is executed in autocommit, so we shouldn't add any more
|
# WARNING: This is executed in autocommit, so we shouldn't add any more
|
||||||
# SQL calls in here (without being very careful).
|
# SQL calls in here (without being very careful).
|
||||||
|
@ -258,8 +256,10 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
|
||||||
retry_interval = EXCLUDED.retry_interval
|
retry_interval = EXCLUDED.retry_interval
|
||||||
WHERE
|
WHERE
|
||||||
EXCLUDED.retry_interval = 0
|
EXCLUDED.retry_interval = 0
|
||||||
|
OR EXCLUDED.retry_last_ts = 0
|
||||||
OR destinations.retry_interval IS NULL
|
OR destinations.retry_interval IS NULL
|
||||||
OR destinations.retry_interval <= EXCLUDED.retry_interval
|
OR destinations.retry_interval < EXCLUDED.retry_interval
|
||||||
|
OR destinations.retry_last_ts < EXCLUDED.retry_last_ts
|
||||||
"""
|
"""
|
||||||
|
|
||||||
txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
|
txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
/* Copyright 2023 The Matrix.org Foundation C.I.C
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
-- To avoid the possibility of a deadlock, lock the
|
||||||
|
-- `worker_read_write_locks_mode` table so that we serialize inserts/deletes
|
||||||
|
-- for a specific lock name/key.
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION delete_read_write_lock_parent_before() RETURNS trigger AS $$
|
||||||
|
BEGIN
|
||||||
|
-- `PERFORM` is a `SELECT` which discards the rows.
|
||||||
|
PERFORM * FROM worker_read_write_locks_mode
|
||||||
|
WHERE
|
||||||
|
lock_name = OLD.lock_name
|
||||||
|
AND lock_key = OLD.lock_key
|
||||||
|
FOR UPDATE;
|
||||||
|
|
||||||
|
RETURN OLD;
|
||||||
|
END
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
DROP TRIGGER IF EXISTS delete_read_write_lock_parent_before_trigger ON worker_read_write_locks;
|
||||||
|
CREATE TRIGGER delete_read_write_lock_parent_before_trigger BEFORE DELETE ON worker_read_write_locks
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE PROCEDURE delete_read_write_lock_parent_before();
|
|
@ -0,0 +1,37 @@
|
||||||
|
/* Copyright 2023 The Matrix.org Foundation C.I.C
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
-- Fix up the triggers that were in `78/04_read_write_locks_triggers.sql`
|
||||||
|
|
||||||
|
-- Reduce the number of writes we do on this table.
|
||||||
|
--
|
||||||
|
-- Note: that we still want to lock the row here (i.e. still do a `DO UPDATE
|
||||||
|
-- SET`) so that we serialize updates.
|
||||||
|
CREATE OR REPLACE FUNCTION upsert_read_write_lock_parent() RETURNS trigger AS $$
|
||||||
|
BEGIN
|
||||||
|
INSERT INTO worker_read_write_locks_mode (lock_name, lock_key, write_lock, token)
|
||||||
|
VALUES (NEW.lock_name, NEW.lock_key, NEW.write_lock, NEW.token)
|
||||||
|
ON CONFLICT (lock_name, lock_key)
|
||||||
|
DO UPDATE SET write_lock = NEW.write_lock
|
||||||
|
WHERE OLD.write_lock != NEW.write_lock;
|
||||||
|
RETURN NEW;
|
||||||
|
END
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
DROP TRIGGER IF EXISTS upsert_read_write_lock_parent_trigger ON worker_read_write_locks;
|
||||||
|
CREATE TRIGGER upsert_read_write_lock_parent_trigger BEFORE INSERT ON worker_read_write_locks
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE PROCEDURE upsert_read_write_lock_parent();
|
|
@ -154,13 +154,15 @@ class TaskScheduler:
|
||||||
f"No function associated with action {action} of the scheduled task"
|
f"No function associated with action {action} of the scheduled task"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
status = TaskStatus.SCHEDULED
|
||||||
if timestamp is None or timestamp < self._clock.time_msec():
|
if timestamp is None or timestamp < self._clock.time_msec():
|
||||||
timestamp = self._clock.time_msec()
|
timestamp = self._clock.time_msec()
|
||||||
|
status = TaskStatus.ACTIVE
|
||||||
|
|
||||||
task = ScheduledTask(
|
task = ScheduledTask(
|
||||||
random_string(16),
|
random_string(16),
|
||||||
action,
|
action,
|
||||||
TaskStatus.SCHEDULED,
|
status,
|
||||||
timestamp,
|
timestamp,
|
||||||
resource_id,
|
resource_id,
|
||||||
params,
|
params,
|
||||||
|
|
|
@ -38,6 +38,7 @@ from synapse.handlers.presence import (
|
||||||
from synapse.rest import admin
|
from synapse.rest import admin
|
||||||
from synapse.rest.client import room
|
from synapse.rest.client import room
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
|
from synapse.storage.database import LoggingDatabaseConnection
|
||||||
from synapse.types import JsonDict, UserID, get_domain_from_id
|
from synapse.types import JsonDict, UserID, get_domain_from_id
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
|
|
||||||
|
@ -513,6 +514,121 @@ class PresenceTimeoutTestCase(unittest.TestCase):
|
||||||
self.assertEqual(state, new_state)
|
self.assertEqual(state, new_state)
|
||||||
|
|
||||||
|
|
||||||
|
class PresenceHandlerInitTestCase(unittest.HomeserverTestCase):
|
||||||
|
def default_config(self) -> JsonDict:
|
||||||
|
config = super().default_config()
|
||||||
|
# Disable background tasks on this worker so that the PresenceHandler isn't
|
||||||
|
# loaded until we request it.
|
||||||
|
config["run_background_tasks_on"] = "other"
|
||||||
|
return config
|
||||||
|
|
||||||
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||||
|
self.user_id = f"@test:{self.hs.config.server.server_name}"
|
||||||
|
|
||||||
|
# Move the reactor to the initial time.
|
||||||
|
self.reactor.advance(1000)
|
||||||
|
now = self.clock.time_msec()
|
||||||
|
|
||||||
|
main_store = hs.get_datastores().main
|
||||||
|
self.get_success(
|
||||||
|
main_store.update_presence(
|
||||||
|
[
|
||||||
|
UserPresenceState(
|
||||||
|
user_id=self.user_id,
|
||||||
|
state=PresenceState.ONLINE,
|
||||||
|
last_active_ts=now,
|
||||||
|
last_federation_update_ts=now,
|
||||||
|
last_user_sync_ts=now,
|
||||||
|
status_msg=None,
|
||||||
|
currently_active=True,
|
||||||
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Regenerate the preloaded presence information on PresenceStore.
|
||||||
|
def refill_presence(db_conn: LoggingDatabaseConnection) -> None:
|
||||||
|
main_store._presence_on_startup = main_store._get_active_presence(db_conn)
|
||||||
|
|
||||||
|
self.get_success(main_store.db_pool.runWithConnection(refill_presence))
|
||||||
|
|
||||||
|
def test_restored_presence_idles(self) -> None:
|
||||||
|
"""The presence state restored from the database should not persist forever."""
|
||||||
|
|
||||||
|
# Get the handler (which kicks off a bunch of timers).
|
||||||
|
presence_handler = self.hs.get_presence_handler()
|
||||||
|
|
||||||
|
# Assert the user is online.
|
||||||
|
state = self.get_success(
|
||||||
|
presence_handler.get_state(UserID.from_string(self.user_id))
|
||||||
|
)
|
||||||
|
self.assertEqual(state.state, PresenceState.ONLINE)
|
||||||
|
|
||||||
|
# Advance such that the user should timeout.
|
||||||
|
self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000)
|
||||||
|
self.reactor.pump([5])
|
||||||
|
|
||||||
|
# Check that the user is now offline.
|
||||||
|
state = self.get_success(
|
||||||
|
presence_handler.get_state(UserID.from_string(self.user_id))
|
||||||
|
)
|
||||||
|
self.assertEqual(state.state, PresenceState.OFFLINE)
|
||||||
|
|
||||||
|
@parameterized.expand(
|
||||||
|
[
|
||||||
|
(PresenceState.BUSY, PresenceState.BUSY),
|
||||||
|
(PresenceState.ONLINE, PresenceState.ONLINE),
|
||||||
|
(PresenceState.UNAVAILABLE, PresenceState.UNAVAILABLE),
|
||||||
|
# Offline syncs don't update the state.
|
||||||
|
(PresenceState.OFFLINE, PresenceState.ONLINE),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
@unittest.override_config({"experimental_features": {"msc3026_enabled": True}})
|
||||||
|
def test_restored_presence_online_after_sync(
|
||||||
|
self, sync_state: str, expected_state: str
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
The presence state restored from the database should be overridden with sync after a timeout.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
sync_state: The presence state of the new sync.
|
||||||
|
expected_state: The expected presence right after the sync.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Get the handler (which kicks off a bunch of timers).
|
||||||
|
presence_handler = self.hs.get_presence_handler()
|
||||||
|
|
||||||
|
# Assert the user is online, as restored.
|
||||||
|
state = self.get_success(
|
||||||
|
presence_handler.get_state(UserID.from_string(self.user_id))
|
||||||
|
)
|
||||||
|
self.assertEqual(state.state, PresenceState.ONLINE)
|
||||||
|
|
||||||
|
# Advance slightly and sync.
|
||||||
|
self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000 / 2)
|
||||||
|
self.get_success(
|
||||||
|
presence_handler.user_syncing(
|
||||||
|
self.user_id, sync_state != PresenceState.OFFLINE, sync_state
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Assert the user is in the expected state.
|
||||||
|
state = self.get_success(
|
||||||
|
presence_handler.get_state(UserID.from_string(self.user_id))
|
||||||
|
)
|
||||||
|
self.assertEqual(state.state, expected_state)
|
||||||
|
|
||||||
|
# Advance such that the user's preloaded data times out, but not the new sync.
|
||||||
|
self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000 / 2)
|
||||||
|
self.reactor.pump([5])
|
||||||
|
|
||||||
|
# Check that the user is in the sync state (as the client is currently syncing still).
|
||||||
|
state = self.get_success(
|
||||||
|
presence_handler.get_state(UserID.from_string(self.user_id))
|
||||||
|
)
|
||||||
|
self.assertEqual(state.state, sync_state)
|
||||||
|
|
||||||
|
|
||||||
class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
|
class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
|
||||||
user_id = "@test:server"
|
user_id = "@test:server"
|
||||||
user_id_obj = UserID.from_string(user_id)
|
user_id_obj = UserID.from_string(user_id)
|
||||||
|
|
Loading…
Reference in New Issue