From 39d131b016673bbd4d3c28095c8838b8c6dc0953 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Jul 2023 17:25:00 +0100 Subject: [PATCH] Add basic read/write lock (#15782) --- changelog.d/15782.misc | 1 + synapse/_scripts/synapse_port_db.py | 9 +- synapse/storage/databases/main/lock.py | 226 ++++++++++---- .../04_read_write_locks_triggers.sql.postgres | 152 ++++++++++ .../04_read_write_locks_triggers.sql.sqlite | 119 ++++++++ tests/storage/databases/main/test_lock.py | 283 +++++++++++++++++- 6 files changed, 731 insertions(+), 59 deletions(-) create mode 100644 changelog.d/15782.misc create mode 100644 synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.postgres create mode 100644 synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.sqlite diff --git a/changelog.d/15782.misc b/changelog.d/15782.misc new file mode 100644 index 0000000000..aae493b973 --- /dev/null +++ b/changelog.d/15782.misc @@ -0,0 +1 @@ +Add read/write style cross-worker locks. diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index e126a2e0c5..7c4aa0afa2 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -197,6 +197,11 @@ IGNORED_TABLES = { "ui_auth_sessions", "ui_auth_sessions_credentials", "ui_auth_sessions_ips", + # Ignore the worker locks table, as a) there shouldn't be any acquired locks + # after porting, and b) the circular foreign key constraints make it hard to + # port. + "worker_read_write_locks_mode", + "worker_read_write_locks", } @@ -805,7 +810,9 @@ class Porter: ) # Map from table name to args passed to `handle_table`, i.e. a tuple # of: `postgres_size`, `table_size`, `forward_chunk`, `backward_chunk`. - tables_to_port_info_map = {r[0]: r[1:] for r in setup_res} + tables_to_port_info_map = { + r[0]: r[1:] for r in setup_res if r[0] not in IGNORED_TABLES + } # Step 5. Do the copying. # diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py index 7270ef09da..c89b4f7919 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py @@ -25,6 +25,7 @@ from synapse.storage.database import ( LoggingDatabaseConnection, LoggingTransaction, ) +from synapse.storage.engines import PostgresEngine from synapse.util import Clock from synapse.util.stringutils import random_string @@ -68,12 +69,20 @@ class LockStore(SQLBaseStore): self._reactor = hs.get_reactor() self._instance_name = hs.get_instance_id() - # A map from `(lock_name, lock_key)` to the token of any locks that we - # think we currently hold. - self._live_tokens: WeakValueDictionary[ + # A map from `(lock_name, lock_key)` to lock that we think we + # currently hold. + self._live_lock_tokens: WeakValueDictionary[ Tuple[str, str], Lock ] = WeakValueDictionary() + # A map from `(lock_name, lock_key, token)` to read/write lock that we + # think we currently hold. For a given lock_name/lock_key, there can be + # multiple read locks at a time but only one write lock (no mixing read + # and write locks at the same time). + self._live_read_write_lock_tokens: WeakValueDictionary[ + Tuple[str, str, str], Lock + ] = WeakValueDictionary() + # When we shut down we want to remove the locks. Technically this can # lead to a race, as we may drop the lock while we are still processing. # However, a) it should be a small window, b) the lock is best effort @@ -91,11 +100,13 @@ class LockStore(SQLBaseStore): """Called when the server is shutting down""" logger.info("Dropping held locks due to shutdown") - # We need to take a copy of the tokens dict as dropping the locks will - # cause the dictionary to change. - locks = dict(self._live_tokens) + # We need to take a copy of the locks as dropping the locks will cause + # the dictionary to change. + locks = list(self._live_lock_tokens.values()) + list( + self._live_read_write_lock_tokens.values() + ) - for lock in locks.values(): + for lock in locks: await lock.release() logger.info("Dropped locks due to shutdown") @@ -122,7 +133,7 @@ class LockStore(SQLBaseStore): """ # Check if this process has taken out a lock and if it's still valid. - lock = self._live_tokens.get((lock_name, lock_key)) + lock = self._live_lock_tokens.get((lock_name, lock_key)) if lock and await lock.is_still_valid(): return None @@ -176,61 +187,111 @@ class LockStore(SQLBaseStore): self._reactor, self._clock, self, + read_write=False, lock_name=lock_name, lock_key=lock_key, token=token, ) - self._live_tokens[(lock_name, lock_key)] = lock + self._live_lock_tokens[(lock_name, lock_key)] = lock return lock - async def _is_lock_still_valid( - self, lock_name: str, lock_key: str, token: str - ) -> bool: - """Checks whether this instance still holds the lock.""" - last_renewed_ts = await self.db_pool.simple_select_one_onecol( - table="worker_locks", - keyvalues={ - "lock_name": lock_name, - "lock_key": lock_key, - "token": token, - }, - retcol="last_renewed_ts", - allow_none=True, - desc="is_lock_still_valid", - ) - return ( - last_renewed_ts is not None - and self._clock.time_msec() - _LOCK_TIMEOUT_MS < last_renewed_ts + async def try_acquire_read_write_lock( + self, + lock_name: str, + lock_key: str, + write: bool, + ) -> Optional["Lock"]: + """Try to acquire a lock for the given name/key. Will return an async + context manager if the lock is successfully acquired, which *must* be + used (otherwise the lock will leak). + """ + + now = self._clock.time_msec() + token = random_string(6) + + def _try_acquire_read_write_lock_txn(txn: LoggingTransaction) -> None: + # We attempt to acquire the lock by inserting into + # `worker_read_write_locks` and seeing if that fails any + # constraints. If it doesn't then we have acquired the lock, + # otherwise we haven't. + # + # Before that though we clear the table of any stale locks. + + delete_sql = """ + DELETE FROM worker_read_write_locks + WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?; + """ + + insert_sql = """ + INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts) + VALUES (?, ?, ?, ?, ?, ?) + """ + + if isinstance(self.database_engine, PostgresEngine): + # For Postgres we can send these queries at the same time. + txn.execute( + delete_sql + ";" + insert_sql, + ( + # DELETE args + now - _LOCK_TIMEOUT_MS, + lock_name, + lock_key, + # UPSERT args + lock_name, + lock_key, + write, + self._instance_name, + token, + now, + ), + ) + else: + # For SQLite these need to be two queries. + txn.execute( + delete_sql, + ( + now - _LOCK_TIMEOUT_MS, + lock_name, + lock_key, + ), + ) + txn.execute( + insert_sql, + ( + lock_name, + lock_key, + write, + self._instance_name, + token, + now, + ), + ) + + return + + try: + await self.db_pool.runInteraction( + "try_acquire_read_write_lock", + _try_acquire_read_write_lock_txn, + ) + except self.database_engine.module.IntegrityError: + return None + + lock = Lock( + self._reactor, + self._clock, + self, + read_write=True, + lock_name=lock_name, + lock_key=lock_key, + token=token, ) - async def _renew_lock(self, lock_name: str, lock_key: str, token: str) -> None: - """Attempt to renew the lock if we still hold it.""" - await self.db_pool.simple_update( - table="worker_locks", - keyvalues={ - "lock_name": lock_name, - "lock_key": lock_key, - "token": token, - }, - updatevalues={"last_renewed_ts": self._clock.time_msec()}, - desc="renew_lock", - ) + self._live_read_write_lock_tokens[(lock_name, lock_key, token)] = lock - async def _drop_lock(self, lock_name: str, lock_key: str, token: str) -> None: - """Attempt to drop the lock, if we still hold it""" - await self.db_pool.simple_delete( - table="worker_locks", - keyvalues={ - "lock_name": lock_name, - "lock_key": lock_key, - "token": token, - }, - desc="drop_lock", - ) - - self._live_tokens.pop((lock_name, lock_key), None) + return lock class Lock: @@ -259,6 +320,7 @@ class Lock: reactor: IReactorCore, clock: Clock, store: LockStore, + read_write: bool, lock_name: str, lock_key: str, token: str, @@ -266,13 +328,23 @@ class Lock: self._reactor = reactor self._clock = clock self._store = store + self._read_write = read_write self._lock_name = lock_name self._lock_key = lock_key self._token = token + self._table = "worker_read_write_locks" if read_write else "worker_locks" + self._looping_call = clock.looping_call( - self._renew, _RENEWAL_INTERVAL_MS, store, lock_name, lock_key, token + self._renew, + _RENEWAL_INTERVAL_MS, + store, + clock, + read_write, + lock_name, + lock_key, + token, ) self._dropped = False @@ -281,6 +353,8 @@ class Lock: @wrap_as_background_process("Lock._renew") async def _renew( store: LockStore, + clock: Clock, + read_write: bool, lock_name: str, lock_key: str, token: str, @@ -291,12 +365,34 @@ class Lock: don't end up with a reference to `self` in the reactor, which would stop this from being cleaned up if we dropped the context manager. """ - await store._renew_lock(lock_name, lock_key, token) + table = "worker_read_write_locks" if read_write else "worker_locks" + await store.db_pool.simple_update( + table=table, + keyvalues={ + "lock_name": lock_name, + "lock_key": lock_key, + "token": token, + }, + updatevalues={"last_renewed_ts": clock.time_msec()}, + desc="renew_lock", + ) async def is_still_valid(self) -> bool: """Check if the lock is still held by us""" - return await self._store._is_lock_still_valid( - self._lock_name, self._lock_key, self._token + last_renewed_ts = await self._store.db_pool.simple_select_one_onecol( + table=self._table, + keyvalues={ + "lock_name": self._lock_name, + "lock_key": self._lock_key, + "token": self._token, + }, + retcol="last_renewed_ts", + allow_none=True, + desc="is_lock_still_valid", + ) + return ( + last_renewed_ts is not None + and self._clock.time_msec() - _LOCK_TIMEOUT_MS < last_renewed_ts ) async def __aenter__(self) -> None: @@ -325,7 +421,23 @@ class Lock: if self._looping_call.running: self._looping_call.stop() - await self._store._drop_lock(self._lock_name, self._lock_key, self._token) + await self._store.db_pool.simple_delete( + table=self._table, + keyvalues={ + "lock_name": self._lock_name, + "lock_key": self._lock_key, + "token": self._token, + }, + desc="drop_lock", + ) + + if self._read_write: + self._store._live_read_write_lock_tokens.pop( + (self._lock_name, self._lock_key, self._token), None + ) + else: + self._store._live_lock_tokens.pop((self._lock_name, self._lock_key), None) + self._dropped = True def __del__(self) -> None: diff --git a/synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.postgres b/synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.postgres new file mode 100644 index 0000000000..e1a41be9c9 --- /dev/null +++ b/synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.postgres @@ -0,0 +1,152 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- We implement read/write style locks by using two tables with mutual foreign +-- key constraints. Note that this implementation is vulnerable to starving +-- writers if read locks repeatedly get acquired. +-- +-- The first table (`worker_read_write_locks_mode`) indicates that a given lock +-- has either been acquired in read mode *or* write mode, but not both. This is +-- enforced by the unique constraint. Each instance of a lock being acquired is +-- associated with a random `token`. +-- +-- The second table (`worker_read_write_locks`) tracks who has currently +-- acquired a given lock. For a given lock_name/lock_key, there can be multiple +-- read locks at a time but only one write lock (no mixing read and write locks +-- at the same time). +-- +-- The foreign key from the second to first table enforces that for any given +-- lock the second table cannot have a mix of rows with read or write. +-- +-- The foreign key from the first to second table enforces that we don't have a +-- row for a lock in the first table if not in the second table. +-- +-- +-- Furthermore, we add some triggers to automatically keep the first table up to +-- date when inserting/deleting from the second table. This reduces the number +-- of round trips needed to acquire and release locks, as those operations +-- simply become an INSERT or DELETE. These triggers are added in a separate +-- delta due to database specific syntax. + + +-- A table to track whether a lock is currently acquired, and if so whether its +-- in read or write mode. +CREATE TABLE worker_read_write_locks_mode ( + lock_name TEXT NOT NULL, + lock_key TEXT NOT NULL, + -- Whether this lock is in read (false) or write (true) mode + write_lock BOOLEAN NOT NULL, + -- A token that has currently acquired the lock. We need this so that we can + -- add a foreign constraint from this table to `worker_read_write_locks`. + token TEXT NOT NULL +); + +-- Ensure that we can only have one row per lock +CREATE UNIQUE INDEX worker_read_write_locks_mode_key ON worker_read_write_locks_mode (lock_name, lock_key); +-- We need this (redundant) constraint so that we can have a foreign key +-- constraint against this table. +CREATE UNIQUE INDEX worker_read_write_locks_mode_type ON worker_read_write_locks_mode (lock_name, lock_key, write_lock); + + +-- A table to track who has currently acquired a given lock. +CREATE TABLE worker_read_write_locks ( + lock_name TEXT NOT NULL, + lock_key TEXT NOT NULL, + -- We write the instance name to ease manual debugging, we don't ever read + -- from it. + -- Note: instance names aren't guarenteed to be unique. + instance_name TEXT NOT NULL, + -- Whether the process has taken out a "read" or a "write" lock. + write_lock BOOLEAN NOT NULL, + -- A random string generated each time an instance takes out a lock. Used by + -- the instance to tell whether the lock is still held by it (e.g. in the + -- case where the process stalls for a long time the lock may time out and + -- be taken out by another instance, at which point the original instance + -- can tell it no longer holds the lock as the tokens no longer match). + token TEXT NOT NULL, + last_renewed_ts BIGINT NOT NULL, + + -- This constraint ensures that a given lock has only been acquired in read + -- xor write mode, but not both. + FOREIGN KEY (lock_name, lock_key, write_lock) REFERENCES worker_read_write_locks_mode (lock_name, lock_key, write_lock) +); + +CREATE UNIQUE INDEX worker_read_write_locks_key ON worker_read_write_locks (lock_name, lock_key, token); +-- Ensures that only one instance can acquire a lock in write mode at a time. +CREATE UNIQUE INDEX worker_read_write_locks_write ON worker_read_write_locks (lock_name, lock_key) WHERE write_lock; + + +-- Add a foreign key constraint to ensure that if a lock is in +-- `worker_read_write_locks_mode` then there must be a corresponding row in +-- `worker_read_write_locks` (i.e. we don't accidentally end up with a row in +-- `worker_read_write_locks_mode` when the lock is not currently acquired). +-- +-- We only add to PostgreSQL as SQLite does not support adding constraints +-- after table creation, and so doesn't support "circular" foreign key +-- constraints. +ALTER TABLE worker_read_write_locks_mode ADD CONSTRAINT worker_read_write_locks_mode_foreign + FOREIGN KEY (lock_name, lock_key, token) REFERENCES worker_read_write_locks(lock_name, lock_key, token) DEFERRABLE INITIALLY DEFERRED; + + +-- Add a trigger to UPSERT into `worker_read_write_locks_mode` whenever we try +-- and acquire a lock, i.e. insert into `worker_read_write_locks`, +CREATE OR REPLACE FUNCTION upsert_read_write_lock_parent() RETURNS trigger AS $$ +BEGIN + INSERT INTO worker_read_write_locks_mode (lock_name, lock_key, write_lock, token) + VALUES (NEW.lock_name, NEW.lock_key, NEW.write_lock, NEW.token) + ON CONFLICT (lock_name, lock_key) + DO NOTHING; + RETURN NEW; +END +$$ +LANGUAGE plpgsql; + +CREATE TRIGGER upsert_read_write_lock_parent_trigger BEFORE INSERT ON worker_read_write_locks + FOR EACH ROW + EXECUTE PROCEDURE upsert_read_write_lock_parent(); + + +-- Ensure that we keep `worker_read_write_locks_mode` up to date whenever a lock +-- is released (i.e. a row deleted from `worker_read_write_locks`). Either we +-- update the `worker_read_write_locks_mode.token` to match another instance +-- that has currently acquired the lock, or we delete the row if nobody has +-- currently acquired a lock. +CREATE OR REPLACE FUNCTION delete_read_write_lock_parent() RETURNS trigger AS $$ +DECLARE + new_token TEXT; +BEGIN + SELECT token INTO new_token FROM worker_read_write_locks + WHERE + lock_name = OLD.lock_name + AND lock_key = OLD.lock_key; + + IF NOT FOUND THEN + DELETE FROM worker_read_write_locks_mode + WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key; + ELSE + UPDATE worker_read_write_locks_mode + SET token = new_token + WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key; + END IF; + + RETURN NEW; +END +$$ +LANGUAGE plpgsql; + +CREATE TRIGGER delete_read_write_lock_parent_trigger AFTER DELETE ON worker_read_write_locks + FOR EACH ROW + EXECUTE PROCEDURE delete_read_write_lock_parent(); diff --git a/synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.sqlite b/synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.sqlite new file mode 100644 index 0000000000..be2dfbbb8a --- /dev/null +++ b/synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.sqlite @@ -0,0 +1,119 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- c.f. the postgres version for context. The tables and constraints are the +-- same, however they need to be defined slightly differently to work around how +-- each database handles circular foreign key references. + + + +-- A table to track whether a lock is currently acquired, and if so whether its +-- in read or write mode. +CREATE TABLE worker_read_write_locks_mode ( + lock_name TEXT NOT NULL, + lock_key TEXT NOT NULL, + -- Whether this lock is in read (false) or write (true) mode + write_lock BOOLEAN NOT NULL, + -- A token that has currently acquired the lock. We need this so that we can + -- add a foreign constraint from this table to `worker_read_write_locks`. + token TEXT NOT NULL, + -- Add a foreign key constraint to ensure that if a lock is in + -- `worker_read_write_locks_mode` then there must be a corresponding row in + -- `worker_read_write_locks` (i.e. we don't accidentally end up with a row in + -- `worker_read_write_locks_mode` when the lock is not currently acquired). + FOREIGN KEY (lock_name, lock_key, token) REFERENCES worker_read_write_locks(lock_name, lock_key, token) DEFERRABLE INITIALLY DEFERRED +); + +-- Ensure that we can only have one row per lock +CREATE UNIQUE INDEX worker_read_write_locks_mode_key ON worker_read_write_locks_mode (lock_name, lock_key); +-- We need this (redundant) constraint so that we can have a foreign key +-- constraint against this table. +CREATE UNIQUE INDEX worker_read_write_locks_mode_type ON worker_read_write_locks_mode (lock_name, lock_key, write_lock); + + +-- A table to track who has currently acquired a given lock. +CREATE TABLE worker_read_write_locks ( + lock_name TEXT NOT NULL, + lock_key TEXT NOT NULL, + -- We write the instance name to ease manual debugging, we don't ever read + -- from it. + -- Note: instance names aren't guarenteed to be unique. + instance_name TEXT NOT NULL, + -- Whether the process has taken out a "read" or a "write" lock. + write_lock BOOLEAN NOT NULL, + -- A random string generated each time an instance takes out a lock. Used by + -- the instance to tell whether the lock is still held by it (e.g. in the + -- case where the process stalls for a long time the lock may time out and + -- be taken out by another instance, at which point the original instance + -- can tell it no longer holds the lock as the tokens no longer match). + token TEXT NOT NULL, + last_renewed_ts BIGINT NOT NULL, + + -- This constraint ensures that a given lock has only been acquired in read + -- xor write mode, but not both. + FOREIGN KEY (lock_name, lock_key, write_lock) REFERENCES worker_read_write_locks_mode (lock_name, lock_key, write_lock) +); + +CREATE UNIQUE INDEX worker_read_write_locks_key ON worker_read_write_locks (lock_name, lock_key, token); +-- Ensures that only one instance can acquire a lock in write mode at a time. +CREATE UNIQUE INDEX worker_read_write_locks_write ON worker_read_write_locks (lock_name, lock_key) WHERE write_lock; + + +-- Add a trigger to UPSERT into `worker_read_write_locks_mode` whenever we try +-- and acquire a lock, i.e. insert into `worker_read_write_locks`, +CREATE TRIGGER IF NOT EXISTS upsert_read_write_lock_parent_trigger +BEFORE INSERT ON worker_read_write_locks +FOR EACH ROW +BEGIN + -- First ensure that `worker_read_write_locks_mode` doesn't have stale + -- entries in it, as on SQLite we don't have the foreign key constraint to + -- enforce this. + DELETE FROM worker_read_write_locks_mode + WHERE lock_name = NEW.lock_name AND lock_key = NEW.lock_key + AND NOT EXISTS ( + SELECT 1 FROM worker_read_write_locks + WHERE lock_name = NEW.lock_name AND lock_key = NEW.lock_key + ); + + INSERT INTO worker_read_write_locks_mode (lock_name, lock_key, write_lock, token) + VALUES (NEW.lock_name, NEW.lock_key, NEW.write_lock, NEW.token) + ON CONFLICT (lock_name, lock_key) + DO NOTHING; +END; + +-- Ensure that we keep `worker_read_write_locks_mode` up to date whenever a lock +-- is released (i.e. a row deleted from `worker_read_write_locks`). Either we +-- update the `worker_read_write_locks_mode.token` to match another instance +-- that has currently acquired the lock, or we delete the row if nobody has +-- currently acquired a lock. +CREATE TRIGGER IF NOT EXISTS delete_read_write_lock_parent_trigger +AFTER DELETE ON worker_read_write_locks +FOR EACH ROW +BEGIN + DELETE FROM worker_read_write_locks_mode + WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key + AND NOT EXISTS ( + SELECT 1 FROM worker_read_write_locks + WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key + ); + + UPDATE worker_read_write_locks_mode + SET token = ( + SELECT token FROM worker_read_write_locks + WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key + ) + WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key; +END; diff --git a/tests/storage/databases/main/test_lock.py b/tests/storage/databases/main/test_lock.py index 56cb49d9b5..ad454f6dd8 100644 --- a/tests/storage/databases/main/test_lock.py +++ b/tests/storage/databases/main/test_lock.py @@ -166,4 +166,285 @@ class LockTestCase(unittest.HomeserverTestCase): # Now call the shutdown code self.get_success(self.store._on_shutdown()) - self.assertEqual(self.store._live_tokens, {}) + self.assertEqual(self.store._live_lock_tokens, {}) + + +class ReadWriteLockTestCase(unittest.HomeserverTestCase): + """Test the read/write lock implementation.""" + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + + def test_acquire_write_contention(self) -> None: + """Test that we can only acquire one write lock at a time""" + # Track the number of tasks holding the lock. + # Should be at most 1. + in_lock = 0 + max_in_lock = 0 + + release_lock: "Deferred[None]" = Deferred() + + async def task() -> None: + nonlocal in_lock + nonlocal max_in_lock + + lock = await self.store.try_acquire_read_write_lock( + "name", "key", write=True + ) + if not lock: + return + + async with lock: + in_lock += 1 + max_in_lock = max(max_in_lock, in_lock) + + # Block to allow other tasks to attempt to take the lock. + await release_lock + + in_lock -= 1 + + # Start 3 tasks. + task1 = defer.ensureDeferred(task()) + task2 = defer.ensureDeferred(task()) + task3 = defer.ensureDeferred(task()) + + # Give the reactor a kick so that the database transaction returns. + self.pump() + + release_lock.callback(None) + + # Run the tasks to completion. + # To work around `Linearizer`s using a different reactor to sleep when + # contended (#12841), we call `runUntilCurrent` on + # `twisted.internet.reactor`, which is a different reactor to that used + # by the homeserver. + assert isinstance(reactor, ReactorBase) + self.get_success(task1) + reactor.runUntilCurrent() + self.get_success(task2) + reactor.runUntilCurrent() + self.get_success(task3) + + # At most one task should have held the lock at a time. + self.assertEqual(max_in_lock, 1) + + def test_acquire_multiple_reads(self) -> None: + """Test that we can acquire multiple read locks at a time""" + # Track the number of tasks holding the lock. + in_lock = 0 + max_in_lock = 0 + + release_lock: "Deferred[None]" = Deferred() + + async def task() -> None: + nonlocal in_lock + nonlocal max_in_lock + + lock = await self.store.try_acquire_read_write_lock( + "name", "key", write=False + ) + if not lock: + return + + async with lock: + in_lock += 1 + max_in_lock = max(max_in_lock, in_lock) + + # Block to allow other tasks to attempt to take the lock. + await release_lock + + in_lock -= 1 + + # Start 3 tasks. + task1 = defer.ensureDeferred(task()) + task2 = defer.ensureDeferred(task()) + task3 = defer.ensureDeferred(task()) + + # Give the reactor a kick so that the database transaction returns. + self.pump() + + release_lock.callback(None) + + # Run the tasks to completion. + # To work around `Linearizer`s using a different reactor to sleep when + # contended (#12841), we call `runUntilCurrent` on + # `twisted.internet.reactor`, which is a different reactor to that used + # by the homeserver. + assert isinstance(reactor, ReactorBase) + self.get_success(task1) + reactor.runUntilCurrent() + self.get_success(task2) + reactor.runUntilCurrent() + self.get_success(task3) + + # At most one task should have held the lock at a time. + self.assertEqual(max_in_lock, 3) + + def test_write_lock_acquired(self) -> None: + """Test that we can take out a write lock and that while we hold it + nobody else can take it out. + """ + # First to acquire this lock, so it should complete + lock = self.get_success( + self.store.try_acquire_read_write_lock("name", "key", write=True) + ) + assert lock is not None + + # Enter the context manager + self.get_success(lock.__aenter__()) + + # Attempting to acquire the lock again fails, as both read and write. + lock2 = self.get_success( + self.store.try_acquire_read_write_lock("name", "key", write=True) + ) + self.assertIsNone(lock2) + + lock3 = self.get_success( + self.store.try_acquire_read_write_lock("name", "key", write=False) + ) + self.assertIsNone(lock3) + + # Calling `is_still_valid` reports true. + self.assertTrue(self.get_success(lock.is_still_valid())) + + # Drop the lock + self.get_success(lock.__aexit__(None, None, None)) + + # We can now acquire the lock again. + lock4 = self.get_success( + self.store.try_acquire_read_write_lock("name", "key", write=True) + ) + assert lock4 is not None + self.get_success(lock4.__aenter__()) + self.get_success(lock4.__aexit__(None, None, None)) + + def test_read_lock_acquired(self) -> None: + """Test that we can take out a read lock and that while we hold it + only other reads can use it. + """ + # First to acquire this lock, so it should complete + lock = self.get_success( + self.store.try_acquire_read_write_lock("name", "key", write=False) + ) + assert lock is not None + + # Enter the context manager + self.get_success(lock.__aenter__()) + + # Attempting to acquire the write lock fails + lock2 = self.get_success( + self.store.try_acquire_read_write_lock("name", "key", write=True) + ) + self.assertIsNone(lock2) + + # Attempting to acquire a read lock succeeds + lock3 = self.get_success( + self.store.try_acquire_read_write_lock("name", "key", write=False) + ) + assert lock3 is not None + self.get_success(lock3.__aenter__()) + + # Calling `is_still_valid` reports true. + self.assertTrue(self.get_success(lock.is_still_valid())) + + # Drop the first lock + self.get_success(lock.__aexit__(None, None, None)) + + # Attempting to acquire the write lock still fails, as lock3 is still + # active. + lock4 = self.get_success( + self.store.try_acquire_read_write_lock("name", "key", write=True) + ) + self.assertIsNone(lock4) + + # Drop the still open third lock + self.get_success(lock3.__aexit__(None, None, None)) + + # We can now acquire the lock again. + lock5 = self.get_success( + self.store.try_acquire_read_write_lock("name", "key", write=True) + ) + assert lock5 is not None + self.get_success(lock5.__aenter__()) + self.get_success(lock5.__aexit__(None, None, None)) + + def test_maintain_lock(self) -> None: + """Test that we don't time out locks while they're still active (lock is + renewed in the background if the process is still alive)""" + + lock = self.get_success( + self.store.try_acquire_read_write_lock("name", "key", write=True) + ) + assert lock is not None + + self.get_success(lock.__aenter__()) + + # Wait for ages with the lock, we should not be able to get the lock. + self.reactor.advance(5 * _LOCK_TIMEOUT_MS / 1000) + self.pump() + + lock2 = self.get_success( + self.store.try_acquire_read_write_lock("name", "key", write=True) + ) + self.assertIsNone(lock2) + + self.get_success(lock.__aexit__(None, None, None)) + + def test_timeout_lock(self) -> None: + """Test that we time out locks if they're not updated for ages""" + + lock = self.get_success( + self.store.try_acquire_read_write_lock("name", "key", write=True) + ) + assert lock is not None + + self.get_success(lock.__aenter__()) + + # We simulate the process getting stuck by cancelling the looping call + # that keeps the lock active. + lock._looping_call.stop() + + # Wait for the lock to timeout. + self.reactor.advance(2 * _LOCK_TIMEOUT_MS / 1000) + + lock2 = self.get_success( + self.store.try_acquire_read_write_lock("name", "key", write=True) + ) + self.assertIsNotNone(lock2) + + self.assertFalse(self.get_success(lock.is_still_valid())) + + def test_drop(self) -> None: + """Test that dropping the context manager means we stop renewing the lock""" + + lock = self.get_success( + self.store.try_acquire_read_write_lock("name", "key", write=True) + ) + self.assertIsNotNone(lock) + + del lock + + # Wait for the lock to timeout. + self.reactor.advance(2 * _LOCK_TIMEOUT_MS / 1000) + + lock2 = self.get_success( + self.store.try_acquire_read_write_lock("name", "key", write=True) + ) + self.assertIsNotNone(lock2) + + def test_shutdown(self) -> None: + """Test that shutting down Synapse releases the locks""" + # Acquire two locks + lock = self.get_success( + self.store.try_acquire_read_write_lock("name", "key", write=True) + ) + self.assertIsNotNone(lock) + lock2 = self.get_success( + self.store.try_acquire_read_write_lock("name", "key2", write=True) + ) + self.assertIsNotNone(lock2) + + # Now call the shutdown code + self.get_success(self.store._on_shutdown()) + + self.assertEqual(self.store._live_read_write_lock_tokens, {})