diff --git a/changelog.d/4690.bugfix b/changelog.d/4690.bugfix new file mode 100644 index 0000000000..e4cfc5e413 --- /dev/null +++ b/changelog.d/4690.bugfix @@ -0,0 +1 @@ +Fix TaskStopped exceptions in logs when outbound requests time out. \ No newline at end of file diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py index a3f9e4f67c..d36bcd6336 100644 --- a/synapse/http/__init__.py +++ b/synapse/http/__init__.py @@ -15,8 +15,10 @@ # limitations under the License. import re +from twisted.internet import task from twisted.internet.defer import CancelledError from twisted.python import failure +from twisted.web.client import FileBodyProducer from synapse.api.errors import SynapseError @@ -47,3 +49,16 @@ def redact_uri(uri): r'\1\3', uri ) + + +class QuieterFileBodyProducer(FileBodyProducer): + """Wrapper for FileBodyProducer that avoids CRITICAL errors when the connection drops. + + Workaround for https://github.com/matrix-org/synapse/issues/4003 / + https://twistedmatrix.com/trac/ticket/6528 + """ + def stopProducing(self): + try: + FileBodyProducer.stopProducing(self) + except task.TaskStopped: + pass diff --git a/synapse/http/client.py b/synapse/http/client.py index 47a1f82ff0..ad454f4964 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -15,6 +15,7 @@ # limitations under the License. import logging +from io import BytesIO from six import text_type from six.moves import urllib @@ -39,7 +40,11 @@ from twisted.web.http import PotentialDataLoss from twisted.web.http_headers import Headers from synapse.api.errors import Codes, HttpResponseException, SynapseError -from synapse.http import cancelled_to_request_timed_out_error, redact_uri +from synapse.http import ( + QuieterFileBodyProducer, + cancelled_to_request_timed_out_error, + redact_uri, +) from synapse.util.async_helpers import timeout_deferred from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.logcontext import make_deferred_yieldable @@ -246,7 +251,7 @@ class SimpleHttpClient(object): ) @defer.inlineCallbacks - def request(self, method, uri, data=b'', headers=None): + def request(self, method, uri, data=None, headers=None): """ Args: method (str): HTTP method to use. @@ -265,11 +270,15 @@ class SimpleHttpClient(object): logger.info("Sending request %s %s", method, redact_uri(uri)) try: + body_producer = None + if data is not None: + body_producer = QuieterFileBodyProducer(BytesIO(data)) + request_deferred = treq.request( method, uri, agent=self.agent, - data=data, + data=body_producer, headers=headers, **self._extra_treq_args ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 3c24bf3805..1682c9af13 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -28,11 +28,10 @@ from canonicaljson import encode_canonical_json from prometheus_client import Counter from signedjson.sign import sign_json -from twisted.internet import defer, protocol, task +from twisted.internet import defer, protocol from twisted.internet.error import DNSLookupError from twisted.internet.task import _EPSILON, Cooperator from twisted.web._newclient import ResponseDone -from twisted.web.client import FileBodyProducer from twisted.web.http_headers import Headers import synapse.metrics @@ -44,6 +43,7 @@ from synapse.api.errors import ( RequestSendFailed, SynapseError, ) +from synapse.http import QuieterFileBodyProducer from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent from synapse.util.async_helpers import timeout_deferred from synapse.util.logcontext import make_deferred_yieldable @@ -839,16 +839,3 @@ def encode_query_args(args): query_bytes = urllib.parse.urlencode(encoded_args, True) return query_bytes.encode('utf8') - - -class QuieterFileBodyProducer(FileBodyProducer): - """Wrapper for FileBodyProducer that avoids CRITICAL errors when the connection drops. - - Workaround for https://github.com/matrix-org/synapse/issues/4003 / - https://twistedmatrix.com/trac/ticket/6528 - """ - def stopProducing(self): - try: - FileBodyProducer.stopProducing(self) - except task.TaskStopped: - pass