class Mongo::Server::ConnectionPool::Queue
A LIFO queue of connections to be used by the connection pool. This is based on mperham's connection pool.
@since 2.0.0
Constants
- MAX_SIZE
The default max size for the queue.
- MIN_SIZE
The default min size for the queue.
- WAIT_TIMEOUT
The default timeout, in seconds, to wait for a connection.
Attributes
@return [ Mutex ] mutex The mutex used for synchronization.
@return [ Hash ] options The options.
@return [ Array ] queue The underlying array of connections.
@return [ ConditionVariable ] resource The resource.
Public Class Methods
Initialize the new queue. Will yield the block the number of times for the initial size of the queue.
@example Create the queue.
Mongo::Server::ConnectionPool::Queue.new(max_pool_size: 5) { Connection.new }
@param [ Hash ] options The options.
@option options [ Integer ] :max_pool_size The maximum size. @option options [ Integer ] :min_pool_size The minimum size. @option options [ Float ] :wait_queue_timeout The time to wait, in
seconds, for a free connection.
@since 2.0.0
# File lib/mongo/server/connection_pool/queue.rb, line 109 def initialize(options = {}, &block) @block = block @connections = 0 @options = options @queue = Array.new(min_size) { create_connection } @mutex = Mutex.new @resource = ConditionVariable.new end
Public Instance Methods
Close sockets that have been open for longer than the max idle time, if the
option is set.
@example Close the stale sockets
queue.close_stale_sockets!
@since 2.5.0
# File lib/mongo/server/connection_pool/queue.rb, line 186 def close_stale_sockets! return unless max_idle_time to_refresh = [] queue.each do |connection| if last_checkin = connection.last_checkin if (Time.now - last_checkin) > max_idle_time to_refresh << connection end end end mutex.synchronize do num_checked_out = @connections - queue.size min_size_delta = [(min_size - num_checked_out), 0].max to_refresh.each do |connection| if queue.include?(connection) connection.disconnect! if queue.index(connection) < min_size_delta begin; connection.connect!; rescue; end end end end end end
Dequeue a connection from the queue, waiting for the provided timeout for an item if none is in the queue.
@example Dequeue a connection.
queue.dequeue
@return [ Mongo::Server::Connection
] The next connection.
@since 2.0.0
# File lib/mongo/server/connection_pool/queue.rb, line 59 def dequeue mutex.synchronize do dequeue_connection end end
Disconnect all connections in the queue.
@example Disconnect all connections.
queue.disconnect!
@return [ true ] Always true.
@since 2.1.0
# File lib/mongo/server/connection_pool/queue.rb, line 73 def disconnect! mutex.synchronize do queue.each{ |connection| connection.disconnect! } true end end
Enqueue a connection in the queue.
@example Enqueue a connection.
queue.enqueue(connection)
@param [ Mongo::Server::Connection
] connection The connection.
@since 2.0.0
# File lib/mongo/server/connection_pool/queue.rb, line 88 def enqueue(connection) mutex.synchronize do queue.unshift(connection.record_checkin!) resource.broadcast end end
Get a pretty printed string inspection for the queue.
@example Inspect the queue.
queue.inspect
@return [ String ] The queue inspection.
@since 2.0.0
# File lib/mongo/server/connection_pool/queue.rb, line 126 def inspect "#<Mongo::Server::ConnectionPool::Queue:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " + "wait_timeout=#{wait_timeout} current_size=#{queue.size}>" end
The maximum seconds a socket can remain idle since it has been checked in to the pool.
@example Get the max idle time.
queue.max_idle_time
@return [ Float ] The max socket idle time in seconds.
@since 2.5.0
# File lib/mongo/server/connection_pool/queue.rb, line 175 def max_idle_time @max_idle_time ||= options[:max_idle_time] end
Get the maximum size of the queue.
@example Get the max size.
queue.max_size
@return [ Integer ] The maximum size of the queue.
@since 2.0.0
# File lib/mongo/server/connection_pool/queue.rb, line 139 def max_size @max_size ||= options[:max_pool_size] || MAX_SIZE end
Get the minimum size of the queue.
@example Get the min size.
queue.min_size
@return [ Integer ] The minimum size of the queue.
@since 2.0.0
# File lib/mongo/server/connection_pool/queue.rb, line 151 def min_size @min_size ||= options[:min_pool_size] || MIN_SIZE end
The time to wait, in seconds, for a connection to become available.
@example Get the wait timeout.
queue.wait_timeout
@return [ Float ] The queue wait timeout.
@since 2.0.0
# File lib/mongo/server/connection_pool/queue.rb, line 163 def wait_timeout @wait_timeout ||= options[:wait_queue_timeout] || WAIT_TIMEOUT end
Private Instance Methods
# File lib/mongo/server/connection_pool/queue.rb, line 225 def create_connection if @connections < max_size @connections += 1 @block.call end end
# File lib/mongo/server/connection_pool/queue.rb, line 215 def dequeue_connection deadline = Time.now + wait_timeout loop do return queue.shift unless queue.empty? connection = create_connection return connection if connection wait_for_next!(deadline) end end
# File lib/mongo/server/connection_pool/queue.rb, line 232 def wait_for_next!(deadline) wait = deadline - Time.now if wait <= 0 raise Timeout::Error.new("Timed out attempting to dequeue connection after #{wait_timeout} sec.") end resource.wait(mutex, wait) end