Reduce the size of the HTTP connection pool for non-pushers. (#15514)
Pushers tend to make many connections to the same HTTP host (e.g. a new event comes in, causes events to be pushed, and then the homeserver connects to the same host many times). Due to this the per-host HTTP connection pool size was increased, but this does not make sense for other SimpleHttpClients. Add a parameter for the connection pool and override it for pushers (making a separate SimpleHttpClient for pushers with the increased configuration). This returns the HTTP connection pool settings to the default Twisted ones for non-pusher HTTP clients.erikj/epa_delete
							parent
							
								
									07b1c70d6b
								
							
						
					
					
						commit
						6aca4e7cb8
					
				| 
						 | 
				
			
			@ -0,0 +1 @@
 | 
			
		|||
Reduce the size of the HTTP connection pool for non-pushers.
 | 
			
		||||
| 
						 | 
				
			
			@ -768,6 +768,7 @@ class SimpleHttpClient(BaseHttpClient):
 | 
			
		|||
           request if it were otherwise caught in a blacklist.
 | 
			
		||||
        use_proxy: Whether proxy settings should be discovered and used
 | 
			
		||||
            from conventional environment variables.
 | 
			
		||||
        connection_pool: The connection pool to use for this client's agent.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def __init__(
 | 
			
		||||
| 
						 | 
				
			
			@ -777,6 +778,7 @@ class SimpleHttpClient(BaseHttpClient):
 | 
			
		|||
        ip_whitelist: Optional[IPSet] = None,
 | 
			
		||||
        ip_blacklist: Optional[IPSet] = None,
 | 
			
		||||
        use_proxy: bool = False,
 | 
			
		||||
        connection_pool: Optional[HTTPConnectionPool] = None,
 | 
			
		||||
    ):
 | 
			
		||||
        super().__init__(hs, treq_args=treq_args)
 | 
			
		||||
        self._ip_whitelist = ip_whitelist
 | 
			
		||||
| 
						 | 
				
			
			@ -789,22 +791,12 @@ class SimpleHttpClient(BaseHttpClient):
 | 
			
		|||
                self.reactor, self._ip_whitelist, self._ip_blacklist
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        # the pusher makes lots of concurrent SSL connections to Sygnal, and tends to
 | 
			
		||||
        # do so in batches, so we need to allow the pool to keep lots of idle
 | 
			
		||||
        # connections around.
 | 
			
		||||
        pool = HTTPConnectionPool(self.reactor)
 | 
			
		||||
        # XXX: The justification for using the cache factor here is that larger
 | 
			
		||||
        # instances will need both more cache and more connections.
 | 
			
		||||
        # Still, this should probably be a separate dial
 | 
			
		||||
        pool.maxPersistentPerHost = max(int(100 * hs.config.caches.global_factor), 5)
 | 
			
		||||
        pool.cachedConnectionTimeout = 2 * 60
 | 
			
		||||
 | 
			
		||||
        self.agent: IAgent = ProxyAgent(
 | 
			
		||||
            self.reactor,
 | 
			
		||||
            hs.get_reactor(),
 | 
			
		||||
            connectTimeout=15,
 | 
			
		||||
            contextFactory=self.hs.get_http_client_context_factory(),
 | 
			
		||||
            pool=pool,
 | 
			
		||||
            pool=connection_pool,
 | 
			
		||||
            use_proxy=use_proxy,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -140,7 +140,8 @@ class HttpPusher(Pusher):
 | 
			
		|||
            )
 | 
			
		||||
 | 
			
		||||
        self.url = url
 | 
			
		||||
        self.http_client = hs.get_proxied_blacklisted_http_client()
 | 
			
		||||
        self.http_client = hs.get_pusher_http_client()
 | 
			
		||||
 | 
			
		||||
        self.data_minus_url = {}
 | 
			
		||||
        self.data_minus_url.update(self.data)
 | 
			
		||||
        del self.data_minus_url["url"]
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -27,6 +27,7 @@ from typing_extensions import TypeAlias
 | 
			
		|||
 | 
			
		||||
from twisted.internet.interfaces import IOpenSSLContextFactory
 | 
			
		||||
from twisted.internet.tcp import Port
 | 
			
		||||
from twisted.web.client import HTTPConnectionPool
 | 
			
		||||
from twisted.web.iweb import IPolicyForHTTPS
 | 
			
		||||
from twisted.web.resource import Resource
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -453,6 +454,26 @@ class HomeServer(metaclass=abc.ABCMeta):
 | 
			
		|||
            use_proxy=True,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    @cache_in_self
 | 
			
		||||
    def get_pusher_http_client(self) -> SimpleHttpClient:
 | 
			
		||||
        # the pusher makes lots of concurrent SSL connections to Sygnal, and tends to
 | 
			
		||||
        # do so in batches, so we need to allow the pool to keep lots of idle
 | 
			
		||||
        # connections around.
 | 
			
		||||
        pool = HTTPConnectionPool(self.get_reactor())
 | 
			
		||||
        # XXX: The justification for using the cache factor here is that larger
 | 
			
		||||
        # instances will need both more cache and more connections.
 | 
			
		||||
        # Still, this should probably be a separate dial
 | 
			
		||||
        pool.maxPersistentPerHost = max(int(100 * self.config.caches.global_factor), 5)
 | 
			
		||||
        pool.cachedConnectionTimeout = 2 * 60
 | 
			
		||||
 | 
			
		||||
        return SimpleHttpClient(
 | 
			
		||||
            self,
 | 
			
		||||
            ip_whitelist=self.config.server.ip_range_whitelist,
 | 
			
		||||
            ip_blacklist=self.config.server.ip_range_blacklist,
 | 
			
		||||
            use_proxy=True,
 | 
			
		||||
            connection_pool=pool,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    @cache_in_self
 | 
			
		||||
    def get_federation_http_client(self) -> MatrixFederationHttpClient:
 | 
			
		||||
        """
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -52,7 +52,7 @@ class HTTPPusherTests(HomeserverTestCase):
 | 
			
		|||
 | 
			
		||||
        m.post_json_get_json = post_json_get_json
 | 
			
		||||
 | 
			
		||||
        hs = self.setup_test_homeserver(proxied_blacklisted_http_client=m)
 | 
			
		||||
        hs = self.setup_test_homeserver(pusher_http_client=m)
 | 
			
		||||
 | 
			
		||||
        return hs
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -93,7 +93,7 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase):
 | 
			
		|||
        self.make_worker_hs(
 | 
			
		||||
            "synapse.app.generic_worker",
 | 
			
		||||
            {"worker_name": "pusher1", "pusher_instances": ["pusher1"]},
 | 
			
		||||
            proxied_blacklisted_http_client=http_client_mock,
 | 
			
		||||
            pusher_http_client=http_client_mock,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        event_id = self._create_pusher_and_send_msg("user")
 | 
			
		||||
| 
						 | 
				
			
			@ -126,7 +126,7 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase):
 | 
			
		|||
                "worker_name": "pusher1",
 | 
			
		||||
                "pusher_instances": ["pusher1", "pusher2"],
 | 
			
		||||
            },
 | 
			
		||||
            proxied_blacklisted_http_client=http_client_mock1,
 | 
			
		||||
            pusher_http_client=http_client_mock1,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        http_client_mock2 = Mock(spec_set=["post_json_get_json"])
 | 
			
		||||
| 
						 | 
				
			
			@ -140,7 +140,7 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase):
 | 
			
		|||
                "worker_name": "pusher2",
 | 
			
		||||
                "pusher_instances": ["pusher1", "pusher2"],
 | 
			
		||||
            },
 | 
			
		||||
            proxied_blacklisted_http_client=http_client_mock2,
 | 
			
		||||
            pusher_http_client=http_client_mock2,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # We choose a user name that we know should go to pusher1.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue