diff --git a/synapse/util/async.py b/synapse/util/async.py index 5d0fb39130..7d5acecb1c 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -248,11 +248,15 @@ class Limiter(object): # do some work. """ - def __init__(self, max_count): + def __init__(self, max_count, clock=None): """ Args: max_count(int): The maximum number of concurrent access """ + if not clock: + from twisted.internet import reactor + clock = Clock(reactor) + self._clock = clock self.max_count = max_count # key_to_defer is a map from the key to a 2 element list where @@ -277,10 +281,23 @@ class Limiter(object): with PreserveLoggingContext(): yield new_defer logger.info("Acquired limiter lock for key %r", key) + entry[0] += 1 + + # if the code holding the lock completes synchronously, then it + # will recursively run the next claimant on the list. That can + # relatively rapidly lead to stack exhaustion. This is essentially + # the same problem as http://twistedmatrix.com/trac/ticket/9304. + # + # In order to break the cycle, we add a cheeky sleep(0) here to + # ensure that we fall back to the reactor between each iteration. + # + # (This needs to happen while we hold the lock, and the context manager's exit + # code must be synchronous, so this is the only sensible place.) + yield self._clock.sleep(0) + else: logger.info("Acquired uncontended limiter lock for key %r", key) - - entry[0] += 1 + entry[0] += 1 @contextmanager def _ctx_manager(): diff --git a/tests/util/test_limiter.py b/tests/util/test_limiter.py index a5a767b1ff..f7b942f5c1 100644 --- a/tests/util/test_limiter.py +++ b/tests/util/test_limiter.py @@ -48,21 +48,21 @@ class LimiterTestCase(unittest.TestCase): self.assertFalse(d4.called) self.assertFalse(d5.called) - self.assertTrue(d4.called) + cm4 = yield d4 self.assertFalse(d5.called) with cm3: self.assertFalse(d5.called) - self.assertTrue(d5.called) + cm5 = yield d5 with cm2: pass - with (yield d4): + with cm4: pass - with (yield d5): + with cm5: pass d6 = limiter.queue(key)