Backport deferred.addTimeout

Twisted 16.0 doesn't have addTimeout, so let's backport it.
pull/3127/head
Richard van der Hoff 2018-04-27 12:52:30 +01:00
parent 1ea904b9f0
commit 9d2c1b8429
5 changed files with 90 additions and 14 deletions

View File

@ -28,7 +28,7 @@ class RequestTimedOutError(SynapseError):
def cancelled_to_request_timed_out_error(value):
"""Turns CancelledErrors into RequestTimedOutErrors.
For use with deferred.addTimeout()
For use with async.add_timeout_to_deferred
"""
if isinstance(value, failure.Failure):
value.trap(CancelledError)

View File

@ -20,6 +20,7 @@ from synapse.api.errors import (
CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
)
from synapse.http import cancelled_to_request_timed_out_error
from synapse.util.async import add_timeout_to_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable
import synapse.metrics
@ -102,8 +103,9 @@ class SimpleHttpClient(object):
request_deferred = self.agent.request(
method, uri, *args, **kwargs
)
request_deferred.addTimeout(
60, reactor, cancelled_to_request_timed_out_error,
add_timeout_to_deferred(
request_deferred,
60, cancelled_to_request_timed_out_error,
)
response = yield make_deferred_yieldable(request_deferred)

View File

@ -22,7 +22,7 @@ from twisted.web._newclient import ResponseDone
from synapse.http import cancelled_to_request_timed_out_error
from synapse.http.endpoint import matrix_federation_endpoint
import synapse.metrics
from synapse.util.async import sleep
from synapse.util.async import sleep, add_timeout_to_deferred
from synapse.util import logcontext
from synapse.util.logcontext import make_deferred_yieldable
import synapse.util.retryutils
@ -193,9 +193,10 @@ class MatrixFederationHttpClient(object):
Headers(headers_dict),
producer
)
request_deferred.addTimeout(
add_timeout_to_deferred(
request_deferred,
timeout / 1000. if timeout else 60,
reactor, cancelled_to_request_timed_out_error,
cancelled_to_request_timed_out_error,
)
response = yield make_deferred_yieldable(
request_deferred,

View File

@ -13,15 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, reactor
from twisted.internet.defer import TimeoutError
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
from synapse.util.logutils import log_function
from synapse.util.async import ObservableDeferred
from synapse.util.async import (
ObservableDeferred, add_timeout_to_deferred,
DeferredTimeoutError,
)
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
from synapse.util.metrics import Measure
from synapse.types import StreamToken
@ -332,8 +334,9 @@ class Notifier(object):
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
listener.deferred.addTimeout(
(end_time - now) / 1000., reactor,
add_timeout_to_deferred(
listener.deferred,
(end_time - now) / 1000.,
)
with PreserveLoggingContext():
yield listener.deferred
@ -347,7 +350,7 @@ class Notifier(object):
# Update the prev_token to the current_token since nothing
# has happened between the old prev_token and the current_token
prev_token = current_token
except TimeoutError:
except DeferredTimeoutError:
break
except defer.CancelledError:
break
@ -552,11 +555,14 @@ class Notifier(object):
if end_time <= now:
break
listener.deferred.addTimeout((end_time - now) / 1000., reactor)
add_timeout_to_deferred(
listener.deferred.addTimeout,
(end_time - now) / 1000.,
)
try:
with PreserveLoggingContext():
yield listener.deferred
except TimeoutError:
except DeferredTimeoutError:
break
except defer.CancelledError:
break

View File

@ -15,6 +15,8 @@
from twisted.internet import defer, reactor
from twisted.internet.defer import CancelledError
from twisted.python import failure
from .logcontext import (
PreserveLoggingContext, make_deferred_yieldable, preserve_fn
@ -392,3 +394,68 @@ class ReadWriteLock(object):
self.key_to_current_writer.pop(key)
defer.returnValue(_ctx_manager())
class DeferredTimeoutError(Exception):
"""
This error is raised by default when a L{Deferred} times out.
"""
def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
"""
Add a timeout to a deferred by scheduling it to be cancelled after
timeout seconds.
This is essentially a backport of deferred.addTimeout, which was introduced
in twisted 16.5.
If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
unless a cancelable function was passed to its initialization or unless
a different on_timeout_cancel callable is provided.
Args:
deferred (defer.Deferred): deferred to be timed out
timeout (Number): seconds to time out after
on_timeout_cancel (callable): A callable which is called immediately
after the deferred times out, and not if this deferred is
otherwise cancelled before the timeout.
It takes an arbitrary value, which is the value of the deferred at
that exact point in time (probably a CancelledError Failure), and
the timeout.
The default callable (if none is provided) will translate a
CancelledError Failure into a DeferredTimeoutError.
"""
timed_out = [False]
def time_it_out():
timed_out[0] = True
deferred.cancel()
delayed_call = reactor.callLater(timeout, time_it_out)
def convert_cancelled(value):
if timed_out[0]:
to_call = on_timeout_cancel or _cancelled_to_timed_out_error
return to_call(value, timeout)
return value
deferred.addBoth(convert_cancelled)
def cancel_timeout(result):
# stop the pending call to cancel the deferred if it's been fired
if delayed_call.active():
delayed_call.cancel()
return result
deferred.addBoth(cancel_timeout)
def _cancelled_to_timed_out_error(value, timeout):
if isinstance(value, failure.Failure):
value.trap(CancelledError)
raise DeferredTimeoutError(timeout, "Deferred")
return value