Merge branch 'rav/fix_linearizer_cancellation' into develop

pull/3681/head
Richard van der Hoff 2018-08-10 14:57:27 +01:00
commit c31793a784
2 changed files with 69 additions and 43 deletions

1
changelog.d/3676.bugfix Normal file
View File

@ -0,0 +1 @@
Make the tests pass on Twisted < 18.7.0

View File

@ -188,62 +188,30 @@ class Linearizer(object):
# things blocked from executing. # things blocked from executing.
self.key_to_defer = {} self.key_to_defer = {}
@defer.inlineCallbacks
def queue(self, key): def queue(self, key):
# we avoid doing defer.inlineCallbacks here, so that cancellation works correctly.
# (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not
# propagated inside inlineCallbacks until Twisted 18.7)
entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()]) entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()])
# If the number of things executing is greater than the maximum # If the number of things executing is greater than the maximum
# then add a deferred to the list of blocked items # then add a deferred to the list of blocked items
# When on of the things currently executing finishes it will callback # When one of the things currently executing finishes it will callback
# this item so that it can continue executing. # this item so that it can continue executing.
if entry[0] >= self.max_count: if entry[0] >= self.max_count:
new_defer = defer.Deferred() res = self._await_lock(key)
entry[1][new_defer] = 1
logger.info(
"Waiting to acquire linearizer lock %r for key %r", self.name, key,
)
try:
yield make_deferred_yieldable(new_defer)
except Exception as e:
if isinstance(e, CancelledError):
logger.info(
"Cancelling wait for linearizer lock %r for key %r",
self.name, key,
)
else:
logger.warn(
"Unexpected exception waiting for linearizer lock %r for key %r",
self.name, key,
)
# we just have to take ourselves back out of the queue.
del entry[1][new_defer]
raise
logger.info("Acquired linearizer lock %r for key %r", self.name, key)
entry[0] += 1
# if the code holding the lock completes synchronously, then it
# will recursively run the next claimant on the list. That can
# relatively rapidly lead to stack exhaustion. This is essentially
# the same problem as http://twistedmatrix.com/trac/ticket/9304.
#
# In order to break the cycle, we add a cheeky sleep(0) here to
# ensure that we fall back to the reactor between each iteration.
#
# (This needs to happen while we hold the lock, and the context manager's exit
# code must be synchronous, so this is the only sensible place.)
yield self._clock.sleep(0)
else: else:
logger.info( logger.info(
"Acquired uncontended linearizer lock %r for key %r", self.name, key, "Acquired uncontended linearizer lock %r for key %r", self.name, key,
) )
entry[0] += 1 entry[0] += 1
res = defer.succeed(None)
# once we successfully get the lock, we need to return a context manager which
# will release the lock.
@contextmanager @contextmanager
def _ctx_manager(): def _ctx_manager(_):
try: try:
yield yield
finally: finally:
@ -264,7 +232,64 @@ class Linearizer(object):
# map. # map.
del self.key_to_defer[key] del self.key_to_defer[key]
defer.returnValue(_ctx_manager()) res.addCallback(_ctx_manager)
return res
def _await_lock(self, key):
"""Helper for queue: adds a deferred to the queue
Assumes that we've already checked that we've reached the limit of the number
of lock-holders we allow. Creates a new deferred which is added to the list, and
adds some management around cancellations.
Returns the deferred, which will callback once we have secured the lock.
"""
entry = self.key_to_defer[key]
logger.info(
"Waiting to acquire linearizer lock %r for key %r", self.name, key,
)
new_defer = make_deferred_yieldable(defer.Deferred())
entry[1][new_defer] = 1
def cb(_r):
logger.info("Acquired linearizer lock %r for key %r", self.name, key)
entry[0] += 1
# if the code holding the lock completes synchronously, then it
# will recursively run the next claimant on the list. That can
# relatively rapidly lead to stack exhaustion. This is essentially
# the same problem as http://twistedmatrix.com/trac/ticket/9304.
#
# In order to break the cycle, we add a cheeky sleep(0) here to
# ensure that we fall back to the reactor between each iteration.
#
# (This needs to happen while we hold the lock, and the context manager's exit
# code must be synchronous, so this is the only sensible place.)
return self._clock.sleep(0)
def eb(e):
logger.info("defer %r got err %r", new_defer, e)
if isinstance(e, CancelledError):
logger.info(
"Cancelling wait for linearizer lock %r for key %r",
self.name, key,
)
else:
logger.warn(
"Unexpected exception waiting for linearizer lock %r for key %r",
self.name, key,
)
# we just have to take ourselves back out of the queue.
del entry[1][new_defer]
return e
new_defer.addCallbacks(cb, eb)
return new_defer
class ReadWriteLock(object): class ReadWriteLock(object):