class Sequel::ThreadedConnectionPool

A connection pool allowing multi-threaded access to a pool of connections. This is the default connection pool used by Sequel.

Constants

USE_WAITER

Attributes

allocated[R]

A hash with thread/fiber keys and connection values for currently allocated connections. The calling code should already have the mutex before calling this.

available_connections[R]

An array of connections that are available for use by the pool. The calling code should already have the mutex before calling this.

max_size[R]

The maximum number of connections this pool will create (per shard/server if sharding).

Public Class Methods

new(db, opts = OPTS) click to toggle source

The following additional options are respected:

:max_connections

The maximum number of connections the connection pool will open (default 4)

:pool_timeout

The amount of seconds to wait to acquire a connection before raising a PoolTimeout error (default 5)

Calls superclass method Sequel::ConnectionPool::new
   # File lib/sequel/connection_pool/threaded.rb
26 def initialize(db, opts = OPTS)
27   super
28   @max_size = Integer(opts[:max_connections] || 4)
29   raise(Sequel::Error, ':max_connections must be positive') if @max_size < 1
30   @mutex = Mutex.new  
31   @connection_handling = opts[:connection_handling]
32   @available_connections = []
33   @allocated = {}
34   @allocated.compare_by_identity
35   @timeout = Float(opts[:pool_timeout] || 5)
36   @waiter = ConditionVariable.new
37 end

Public Instance Methods

all_connections() { |c| ... } click to toggle source

Yield all of the available connections, and the one currently allocated to this thread. This will not yield connections currently allocated to other threads, as it is not safe to operate on them. This holds the mutex while it is yielding all of the available connections, which means that until the method’s block returns, the pool is locked.

   # File lib/sequel/connection_pool/threaded.rb
44 def all_connections
45   hold do |c|
46     sync do
47       yield c
48       @available_connections.each{|conn| yield conn}
49     end
50   end
51 end
disconnect(opts=OPTS) click to toggle source

Removes all connections currently available. This method has the effect of disconnecting from the database, assuming that no connections are currently being used. If you want to be able to disconnect connections that are currently in use, use the ShardedThreadedConnectionPool, which can do that. This connection pool does not, for performance reasons. To use the sharded pool, pass the servers: {} option when connecting to the database.

Once a connection is requested using hold, the connection pool creates new connections to the database.

   # File lib/sequel/connection_pool/threaded.rb
62 def disconnect(opts=OPTS)
63   conns = nil
64   sync do
65     conns = @available_connections.dup
66     @available_connections.clear
67     @waiter.signal
68   end
69   conns.each{|conn| disconnect_connection(conn)}
70 end
hold(server=nil) { |conn| ... } click to toggle source

Chooses the first available connection, or if none are available, creates a new connection. Passes the connection to the supplied block:

pool.hold {|conn| conn.execute('DROP TABLE posts')}

Pool#hold is re-entrant, meaning it can be called recursively in the same thread without blocking.

If no connection is immediately available and the pool is already using the maximum number of connections, Pool#hold will block until a connection is available or the timeout expires. If the timeout expires before a connection can be acquired, a Sequel::PoolTimeout is raised.

    # File lib/sequel/connection_pool/threaded.rb
 85 def hold(server=nil)
 86   t = Sequel.current
 87   if conn = owned_connection(t)
 88     return yield(conn)
 89   end
 90   begin
 91     conn = acquire(t)
 92     yield conn
 93   rescue Sequel::DatabaseDisconnectError, *@error_classes => e
 94     if disconnect_error?(e)
 95       oconn = conn
 96       conn = nil
 97       disconnect_connection(oconn) if oconn
 98       sync do 
 99         @allocated.delete(t)
100         @waiter.signal
101       end
102     end
103     raise
104   ensure
105     if conn
106       sync{release(t)}
107       if @connection_handling == :disconnect
108         disconnect_connection(conn)
109       end
110     end
111   end
112 end
pool_type() click to toggle source
    # File lib/sequel/connection_pool/threaded.rb
114 def pool_type
115   :threaded
116 end
size() click to toggle source

The total number of connections opened, either available or allocated. The calling code should not have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
120 def size
121   @mutex.synchronize{_size}
122 end

Private Instance Methods

_size() click to toggle source

The total number of connections opened, either available or allocated. The calling code should already have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
128 def _size
129   @allocated.length + @available_connections.length
130 end
acquire(thread) click to toggle source

Assigns a connection to the supplied thread, if one is available. The calling code should NOT already have the mutex when calling this.

This should return a connection is one is available within the timeout, or raise PoolTimeout if a connection could not be acquired within the timeout.

    # File lib/sequel/connection_pool/threaded.rb
138 def acquire(thread)
139   if conn = assign_connection(thread)
140     return conn
141   end
142 
143   timeout = @timeout
144   timer = Sequel.start_timer
145 
146   sync do
147     @waiter.wait(@mutex, timeout)
148     if conn = next_available
149       return(@allocated[thread] = conn)
150     end
151   end
152 
153   until conn = assign_connection(thread)
154     elapsed = Sequel.elapsed_seconds_since(timer)
155     # :nocov:
156     raise_pool_timeout(elapsed) if elapsed > timeout
157 
158     # It's difficult to get to this point, it can only happen if there is a race condition
159     # where a connection cannot be acquired even after the thread is signalled by the condition variable
160     sync do
161       @waiter.wait(@mutex, timeout - elapsed)
162       if conn = next_available
163         return(@allocated[thread] = conn)
164       end
165     end
166     # :nocov:
167   end
168 
169   conn
170 end
assign_connection(thread) click to toggle source

Assign a connection to the thread, or return nil if one cannot be assigned. The caller should NOT have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
174 def assign_connection(thread)
175   # Thread safe as instance variable is only assigned to local variable
176   # and not operated on outside mutex.
177   allocated = @allocated
178   do_make_new = false
179   to_disconnect = nil
180 
181   sync do
182     if conn = next_available
183       return(allocated[thread] = conn)
184     end
185 
186     if (n = _size) >= (max = @max_size)
187       allocated.keys.each do |t|
188         unless t.alive?
189           (to_disconnect ||= []) << allocated.delete(t)
190         end
191       end
192       n = nil
193     end
194 
195     if (n || _size) < max
196       do_make_new = allocated[thread] = true
197     end
198   end
199 
200   if to_disconnect
201     to_disconnect.each{|dconn| disconnect_connection(dconn)}
202   end
203 
204   # Connect to the database outside of the connection pool mutex,
205   # as that can take a long time and the connection pool mutex
206   # shouldn't be locked while the connection takes place.
207   if do_make_new
208     begin
209       conn = make_new(:default)
210       sync{allocated[thread] = conn}
211     ensure
212       unless conn
213         sync{allocated.delete(thread)}
214       end
215     end
216   end
217 
218   conn
219 end
checkin_connection(conn) click to toggle source

Return a connection to the pool of available connections, returns the connection. The calling code should already have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
223 def checkin_connection(conn)
224   @available_connections << conn
225   conn
226 end
next_available() click to toggle source

Return the next available connection in the pool, or nil if there is not currently an available connection. The calling code should already have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
231 def next_available
232   case @connection_handling
233   when :stack
234     @available_connections.pop
235   else
236     @available_connections.shift
237   end
238 end
owned_connection(thread) click to toggle source

Returns the connection owned by the supplied thread, if any. The calling code should NOT already have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
242 def owned_connection(thread)
243   sync{@allocated[thread]}
244 end
preconnect(concurrent = false) click to toggle source

Create the maximum number of connections immediately. The calling code should NOT have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
248 def preconnect(concurrent = false)
249   enum = (max_size - _size).times
250 
251   conns = if concurrent
252     enum.map{Thread.new{make_new(:default)}}.map(&:value)
253   else
254     enum.map{make_new(:default)}
255   end
256 
257   sync{conns.each{|conn| checkin_connection(conn)}}
258 end
raise_pool_timeout(elapsed) click to toggle source

Raise a PoolTimeout error showing the current timeout, the elapsed time, and the database’s name (if any).

    # File lib/sequel/connection_pool/threaded.rb
262 def raise_pool_timeout(elapsed)
263   name = db.opts[:name]
264   raise ::Sequel::PoolTimeout, "timeout: #{@timeout}, elapsed: #{elapsed}#{", database name: #{name}" if name}"
265 end
release(thread) click to toggle source

Releases the connection assigned to the supplied thread back to the pool. The calling code should already have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
269 def release(thread)
270   conn = @allocated.delete(thread)
271 
272   unless @connection_handling == :disconnect
273     checkin_connection(conn)
274   end
275 
276   @waiter.signal
277   
278   # Ensure that after signalling the condition, some other thread is given the
279   # opportunity to acquire the mutex.
280   # See <https://github.com/socketry/async/issues/99> for more context.
281   sleep(0)
282   
283   nil
284 end
sync() { || ... } click to toggle source

Yield to the block while inside the mutex. The calling code should NOT already have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
288 def sync
289   @mutex.synchronize{yield}
290 end