mirror of https://github.com/tootsuite/mastodon
				
				
				
			
		
			
				
	
	
		
			115 lines
		
	
	
		
			2.4 KiB
		
	
	
	
		
			Ruby
		
	
	
			
		
		
	
	
			115 lines
		
	
	
		
			2.4 KiB
		
	
	
	
		
			Ruby
		
	
	
| # frozen_string_literal: true
 | |
| 
 | |
| require_relative './connection_pool/shared_connection_pool'
 | |
| 
 | |
| class RequestPool
 | |
|   def self.current
 | |
|     @current ||= RequestPool.new
 | |
|   end
 | |
| 
 | |
|   class Reaper
 | |
|     attr_reader :pool, :frequency
 | |
| 
 | |
|     def initialize(pool, frequency)
 | |
|       @pool      = pool
 | |
|       @frequency = frequency
 | |
|     end
 | |
| 
 | |
|     def run
 | |
|       return unless frequency&.positive?
 | |
| 
 | |
|       Thread.new(frequency, pool) do |t, p|
 | |
|         loop do
 | |
|           sleep t
 | |
|           p.flush
 | |
|         end
 | |
|       end
 | |
|     end
 | |
|   end
 | |
| 
 | |
|   MAX_IDLE_TIME = 30
 | |
|   WAIT_TIMEOUT  = 5
 | |
|   MAX_POOL_SIZE = ENV.fetch('MAX_REQUEST_POOL_SIZE', 512).to_i
 | |
| 
 | |
|   class Connection
 | |
|     attr_reader :site, :last_used_at, :created_at, :in_use, :dead, :fresh
 | |
| 
 | |
|     def initialize(site)
 | |
|       @site         = site
 | |
|       @http_client  = http_client
 | |
|       @last_used_at = nil
 | |
|       @created_at   = current_time
 | |
|       @dead         = false
 | |
|       @fresh        = true
 | |
|     end
 | |
| 
 | |
|     def use
 | |
|       @last_used_at = current_time
 | |
|       @in_use       = true
 | |
| 
 | |
|       retries = 0
 | |
| 
 | |
|       begin
 | |
|         yield @http_client
 | |
|       rescue HTTP::ConnectionError
 | |
|         # It's possible the connection was closed, so let's
 | |
|         # try re-opening it once
 | |
| 
 | |
|         close
 | |
| 
 | |
|         if @fresh || retries.positive?
 | |
|           raise
 | |
|         else
 | |
|           @http_client = http_client
 | |
|           retries     += 1
 | |
|           retry
 | |
|         end
 | |
|       rescue StandardError
 | |
|         # If this connection raises errors of any kind, it's
 | |
|         # better if it gets reaped as soon as possible
 | |
| 
 | |
|         close
 | |
|         @dead = true
 | |
|         raise
 | |
|       end
 | |
|     ensure
 | |
|       @fresh  = false
 | |
|       @in_use = false
 | |
|     end
 | |
| 
 | |
|     def seconds_idle
 | |
|       current_time - (@last_used_at || @created_at)
 | |
|     end
 | |
| 
 | |
|     def close
 | |
|       @http_client.close
 | |
|     end
 | |
| 
 | |
|     private
 | |
| 
 | |
|     def http_client
 | |
|       Request.http_client.persistent(@site, timeout: MAX_IDLE_TIME)
 | |
|     end
 | |
| 
 | |
|     def current_time
 | |
|       Process.clock_gettime(Process::CLOCK_MONOTONIC)
 | |
|     end
 | |
|   end
 | |
| 
 | |
|   def initialize
 | |
|     @pool   = ConnectionPool::SharedConnectionPool.new(size: MAX_POOL_SIZE, timeout: WAIT_TIMEOUT) { |site| Connection.new(site) }
 | |
|     @reaper = Reaper.new(self, 30)
 | |
|     @reaper.run
 | |
|   end
 | |
| 
 | |
|   def with(site, &block)
 | |
|     @pool.with(site) do |connection|
 | |
|       ActiveSupport::Notifications.instrument('with.request_pool', miss: connection.fresh, host: connection.site) do
 | |
|         connection.use(&block)
 | |
|       end
 | |
|     end
 | |
|   end
 | |
| 
 | |
|   delegate :size, :flush, to: :@pool
 | |
| end
 |