Fix rare bug that broke looping calls (#16210)
* Fix rare bug that broke looping calls We can't interact with the reactor from the main thread via looping call. Introduced in v1.90.0 / #15791. * Newsfilepull/16213/head
parent
05d824526a
commit
a2e0d4cd60
|
@ -0,0 +1 @@
|
||||||
|
Fix rare bug that broke looping calls, which could lead to e.g. linearly increasing memory usage. Introduced in v1.90.0.
|
|
@ -17,7 +17,7 @@ from types import TracebackType
|
||||||
from typing import TYPE_CHECKING, Collection, Optional, Set, Tuple, Type
|
from typing import TYPE_CHECKING, Collection, Optional, Set, Tuple, Type
|
||||||
from weakref import WeakValueDictionary
|
from weakref import WeakValueDictionary
|
||||||
|
|
||||||
from twisted.internet.interfaces import IReactorCore
|
from twisted.internet.task import LoopingCall
|
||||||
|
|
||||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||||
from synapse.storage._base import SQLBaseStore
|
from synapse.storage._base import SQLBaseStore
|
||||||
|
@ -26,6 +26,7 @@ from synapse.storage.database import (
|
||||||
LoggingDatabaseConnection,
|
LoggingDatabaseConnection,
|
||||||
LoggingTransaction,
|
LoggingTransaction,
|
||||||
)
|
)
|
||||||
|
from synapse.types import ISynapseReactor
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
from synapse.util.stringutils import random_string
|
from synapse.util.stringutils import random_string
|
||||||
|
|
||||||
|
@ -358,7 +359,7 @@ class Lock:
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
reactor: IReactorCore,
|
reactor: ISynapseReactor,
|
||||||
clock: Clock,
|
clock: Clock,
|
||||||
store: LockStore,
|
store: LockStore,
|
||||||
read_write: bool,
|
read_write: bool,
|
||||||
|
@ -377,19 +378,25 @@ class Lock:
|
||||||
|
|
||||||
self._table = "worker_read_write_locks" if read_write else "worker_locks"
|
self._table = "worker_read_write_locks" if read_write else "worker_locks"
|
||||||
|
|
||||||
self._looping_call = clock.looping_call(
|
# We might be called from a non-main thread, so we defer setting up the
|
||||||
self._renew,
|
# looping call.
|
||||||
_RENEWAL_INTERVAL_MS,
|
self._looping_call: Optional[LoopingCall] = None
|
||||||
store,
|
reactor.callFromThread(self._setup_looping_call)
|
||||||
clock,
|
|
||||||
read_write,
|
|
||||||
lock_name,
|
|
||||||
lock_key,
|
|
||||||
token,
|
|
||||||
)
|
|
||||||
|
|
||||||
self._dropped = False
|
self._dropped = False
|
||||||
|
|
||||||
|
def _setup_looping_call(self) -> None:
|
||||||
|
self._looping_call = self._clock.looping_call(
|
||||||
|
self._renew,
|
||||||
|
_RENEWAL_INTERVAL_MS,
|
||||||
|
self._store,
|
||||||
|
self._clock,
|
||||||
|
self._read_write,
|
||||||
|
self._lock_name,
|
||||||
|
self._lock_key,
|
||||||
|
self._token,
|
||||||
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@wrap_as_background_process("Lock._renew")
|
@wrap_as_background_process("Lock._renew")
|
||||||
async def _renew(
|
async def _renew(
|
||||||
|
@ -459,7 +466,7 @@ class Lock:
|
||||||
if self._dropped:
|
if self._dropped:
|
||||||
return
|
return
|
||||||
|
|
||||||
if self._looping_call.running:
|
if self._looping_call and self._looping_call.running:
|
||||||
self._looping_call.stop()
|
self._looping_call.stop()
|
||||||
|
|
||||||
await self._store.db_pool.simple_delete(
|
await self._store.db_pool.simple_delete(
|
||||||
|
@ -486,8 +493,9 @@ class Lock:
|
||||||
# We should not be dropped without the lock being released (unless
|
# We should not be dropped without the lock being released (unless
|
||||||
# we're shutting down), but if we are then let's at least stop
|
# we're shutting down), but if we are then let's at least stop
|
||||||
# renewing the lock.
|
# renewing the lock.
|
||||||
if self._looping_call.running:
|
if self._looping_call and self._looping_call.running:
|
||||||
self._looping_call.stop()
|
# We might be called from a non-main thread.
|
||||||
|
self._reactor.callFromThread(self._looping_call.stop)
|
||||||
|
|
||||||
if self._reactor.running:
|
if self._reactor.running:
|
||||||
logger.error(
|
logger.error(
|
||||||
|
|
|
@ -132,6 +132,7 @@ class LockTestCase(unittest.HomeserverTestCase):
|
||||||
|
|
||||||
# We simulate the process getting stuck by cancelling the looping call
|
# We simulate the process getting stuck by cancelling the looping call
|
||||||
# that keeps the lock active.
|
# that keeps the lock active.
|
||||||
|
assert lock._looping_call
|
||||||
lock._looping_call.stop()
|
lock._looping_call.stop()
|
||||||
|
|
||||||
# Wait for the lock to timeout.
|
# Wait for the lock to timeout.
|
||||||
|
@ -403,6 +404,7 @@ class ReadWriteLockTestCase(unittest.HomeserverTestCase):
|
||||||
|
|
||||||
# We simulate the process getting stuck by cancelling the looping call
|
# We simulate the process getting stuck by cancelling the looping call
|
||||||
# that keeps the lock active.
|
# that keeps the lock active.
|
||||||
|
assert lock._looping_call
|
||||||
lock._looping_call.stop()
|
lock._looping_call.stop()
|
||||||
|
|
||||||
# Wait for the lock to timeout.
|
# Wait for the lock to timeout.
|
||||||
|
|
Loading…
Reference in New Issue