diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 250bb1ef91..980e912348 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -32,7 +32,7 @@ from twisted.internet import defer, protocol from twisted.internet.error import DNSLookupError from twisted.internet.task import _EPSILON, Cooperator from twisted.web._newclient import ResponseDone -from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool +from twisted.web.client import FileBodyProducer from twisted.web.http_headers import Headers import synapse.metrics @@ -44,7 +44,7 @@ from synapse.api.errors import ( RequestSendFailed, SynapseError, ) -from synapse.http.endpoint import matrix_federation_endpoint +from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent from synapse.util.async_helpers import timeout_deferred from synapse.util.logcontext import make_deferred_yieldable from synapse.util.metrics import Measure @@ -66,20 +66,6 @@ else: MAXINT = sys.maxint -class MatrixFederationEndpointFactory(object): - def __init__(self, hs): - self.reactor = hs.get_reactor() - self.tls_client_options_factory = hs.tls_client_options_factory - - def endpointForURI(self, uri): - destination = uri.netloc.decode('ascii') - - return matrix_federation_endpoint( - self.reactor, destination, timeout=10, - tls_client_options_factory=self.tls_client_options_factory - ) - - _next_id = 1 @@ -187,12 +173,10 @@ class MatrixFederationHttpClient(object): self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname reactor = hs.get_reactor() - pool = HTTPConnectionPool(reactor) - pool.retryAutomatically = False - pool.maxPersistentPerHost = 5 - pool.cachedConnectionTimeout = 2 * 60 - self.agent = Agent.usingEndpointFactory( - reactor, MatrixFederationEndpointFactory(hs), pool=pool + + self.agent = MatrixFederationAgent( + hs.get_reactor(), + hs.tls_client_options_factory, ) self.clock = hs.get_clock() self._store = hs.get_datastore() @@ -316,9 +300,9 @@ class MatrixFederationHttpClient(object): headers_dict[b"Authorization"] = auth_headers logger.info( - "{%s} [%s] Sending request: %s %s", + "{%s} [%s] Sending request: %s %s; timeout %fs", request.txn_id, request.destination, request.method, - url_str, + url_str, _sec_timeout, ) try: @@ -338,12 +322,11 @@ class MatrixFederationHttpClient(object): reactor=self.hs.get_reactor(), ) - response = yield make_deferred_yieldable( - request_deferred, - ) + response = yield request_deferred except DNSLookupError as e: raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e) except Exception as e: + logger.info("Failed to send request: %s", e) raise_from(RequestSendFailed(e, can_retry=True), e) logger.info( diff --git a/tests/http/test_fedclient.py b/tests/http/test_fedclient.py index 8426eee400..d37f8f9981 100644 --- a/tests/http/test_fedclient.py +++ b/tests/http/test_fedclient.py @@ -15,6 +15,7 @@ from mock import Mock +from twisted.internet import defer from twisted.internet.defer import TimeoutError from twisted.internet.error import ConnectingCancelledError, DNSLookupError from twisted.test.proto_helpers import StringTransport @@ -26,11 +27,20 @@ from synapse.http.matrixfederationclient import ( MatrixFederationHttpClient, MatrixFederationRequest, ) +from synapse.util.logcontext import LoggingContext from tests.server import FakeTransport from tests.unittest import HomeserverTestCase +def check_logcontext(context): + current = LoggingContext.current_context() + if current is not context: + raise AssertionError( + "Expected logcontext %s but was %s" % (context, current), + ) + + class FederationClientTests(HomeserverTestCase): def make_homeserver(self, reactor, clock): @@ -43,6 +53,70 @@ class FederationClientTests(HomeserverTestCase): self.cl = MatrixFederationHttpClient(self.hs) self.reactor.lookups["testserv"] = "1.2.3.4" + def test_client_get(self): + """ + happy-path test of a GET request + """ + @defer.inlineCallbacks + def do_request(): + with LoggingContext("one") as context: + fetch_d = self.cl.get_json("testserv:8008", "foo/bar") + + # Nothing happened yet + self.assertNoResult(fetch_d) + + # should have reset logcontext to the sentinel + check_logcontext(LoggingContext.sentinel) + + try: + fetch_res = yield fetch_d + defer.returnValue(fetch_res) + finally: + check_logcontext(context) + + test_d = do_request() + + self.pump() + + # Nothing happened yet + self.assertNoResult(test_d) + + # Make sure treq is trying to connect + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, factory, _timeout, _bindAddress) = clients[0] + self.assertEqual(host, '1.2.3.4') + self.assertEqual(port, 8008) + + # complete the connection and wire it up to a fake transport + protocol = factory.buildProtocol(None) + transport = StringTransport() + protocol.makeConnection(transport) + + # that should have made it send the request to the transport + self.assertRegex(transport.value(), b"^GET /foo/bar") + + # Deferred is still without a result + self.assertNoResult(test_d) + + # Send it the HTTP response + res_json = '{ "a": 1 }'.encode('ascii') + protocol.dataReceived( + b"HTTP/1.1 200 OK\r\n" + b"Server: Fake\r\n" + b"Content-Type: application/json\r\n" + b"Content-Length: %i\r\n" + b"\r\n" + b"%s" % (len(res_json), res_json) + ) + + self.pump() + + res = self.successResultOf(test_d) + + # check the response is as expected + self.assertEqual(res, {"a": 1}) + def test_dns_error(self): """ If the DNS lookup returns an error, it will bubble up. @@ -54,6 +128,28 @@ class FederationClientTests(HomeserverTestCase): self.assertIsInstance(f.value, RequestSendFailed) self.assertIsInstance(f.value.inner_exception, DNSLookupError) + def test_client_connection_refused(self): + d = self.cl.get_json("testserv:8008", "foo/bar", timeout=10000) + + self.pump() + + # Nothing happened yet + self.assertNoResult(d) + + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, factory, _timeout, _bindAddress) = clients[0] + self.assertEqual(host, '1.2.3.4') + self.assertEqual(port, 8008) + e = Exception("go away") + factory.clientConnectionFailed(None, e) + self.pump(0.5) + + f = self.failureResultOf(d) + + self.assertIsInstance(f.value, RequestSendFailed) + self.assertIs(f.value.inner_exception, e) + def test_client_never_connect(self): """ If the HTTP request is not connected and is timed out, it'll give a