class PG::EM::ConnectionPool

Connection pool for PG::EM::Client

Author

Rafal Michalski

The ConnectionPool allocates new connections asynchronously when there are no free connections left up to the {#max_size} number.

If {Client#async_autoreconnect} option is not set or the re-connect fails the failed connection is dropped from the pool.

@example Basic usage

pg = PG::EM::ConnectionPool.new size: 10, dbname: 'foo'
res = pg.query 'select * from bar'

The list of {Client} command methods that are available in {ConnectionPool}:

Fiber synchronized methods:

The asynchronous command methods:

The pool will only allow for {#max_size} commands (both deferred and fiber synchronized) to be performed concurrently. The pending requests will be queued and executed when connections become available.

Please keep in mind, that the above methods may send commands to different clients from the pool each time they are called. You can’t assume anything about which connection is acquired even if the {#max_size} of the pool is set to one. This is because no connection will be shared between two concurrent requests and the connections maight occasionally fail and they will be dropped from the pool.

This prevents the ‘*_defer` commands to execute transactions.

For transactions use {#transaction} and fiber synchronized methods.

Constants

DEFAULT_SIZE

Attributes

allocated[R]
available[R]
max_size[R]

Maximum number of connections in the connection pool @return [Integer]

Public Class Methods

async_connect(options = {}, &blk)
Alias for: connect_defer
connect(options = {}, &on_connect)
Alias for: new
connect_defer(options = {}, &blk) click to toggle source

Creates and initializes new connection pool.

Attempts to establish the first connection asynchronously.

@return [FeaturedDeferrable] @yieldparam pg [Client|PG::Error] new and connected client instance

on success or a raised PG::Error

Use the returned deferrable’s callback hook to obtain newly created {ConnectionPool}. In case of a connection error errback hook is called with a raised error object as its argument.

If the block is provided it’s bound to both callback and errback hooks of the returned deferrable.

Pass PG::EM::Client options together with ConnectionPool options:

  • :size = 4 - the maximum number of concurrent connections

  • :connection_class = {PG::EM::Client}

@raise [ArgumentError]

# File lib/pg/em/connection_pool.rb, line 148
def self.connect_defer(options = {}, &blk)
  pool = new options.merge(lazy: true)
  pool.__send__(:hold_deferred, blk) do
    ::EM::DefaultDeferrable.new.tap { |df| df.succeed pool }
  end
end
Also aliased as: async_connect
new(options = {}, &on_connect) click to toggle source

Creates and initializes a new connection pool.

The connection pool allocates its first connection upon initialization unless +lazy: true+ option is given.

Pass PG::EM::Client options together with ConnectionPool options:

  • :size = 4 - the maximum number of concurrent connections

  • :lazy = false - should lazy allocate first connection

  • :connection_class = {PG::EM::Client}

For convenience the given block will be set as the on_connect option.

@yieldparam pg [Client] connected client instance on each newly

created connection

@yieldparam is_async [Boolean] always true in a connection pool

context

@yieldparam is_reset [Boolean] always false unless

+async_autoreconnect+ options is +true+ and
was actually re-connecting

@raise [PG::Error] @raise [ArgumentError] @see Client#on_connect

# File lib/pg/em/connection_pool.rb, line 94
def initialize(options = {}, &on_connect)
  @available = []
  @pending = []
  @allocated = {}
  @max_size = DEFAULT_SIZE
  @connection_class = Client

  if block_given?
    options = {on_connect: on_connect}.merge(options)
  end

  lazy = false
  @options = options.reject do |key, value|
    case key.to_sym
    when :size, :max_size
      @max_size = value.to_i
      true
    when :connection_class
      @connection_class = value
      true
    when :lazy
      lazy = value
      true
    end
  end

  raise ArgumentError, "#{self.class}.new: pool size must be >= 1" if @max_size < 1

  # allocate first connection, unless we are lazy
  hold unless lazy
end
Also aliased as: connect

Public Instance Methods

close()
Alias for: finish
execute()
Alias for: hold
finish() click to toggle source

Finishes all available connections and clears the available pool.

After call to this method the pool is still usable and will try to allocate new client connections on subsequent query commands.

# File lib/pg/em/connection_pool.rb, line 171
def finish
  @available.each { |c| c.finish }
  @available.clear
  self
end
Also aliased as: close
hold() { |conn| ... } click to toggle source

Acquires {Client} connection and passes it to the given block.

The connection is allocated to the current fiber and ensures that any subsequent query from the same fiber will be performed on the connection.

It is possible to nest hold calls from the same fiber, so each time the block will be given the same {Client} instance. This feature is needed e.g. for nesting transaction calls. @yieldparam [Client] pg

# File lib/pg/em/connection_pool.rb, line 314
def hold
  fiber = Fiber.current
  id = fiber.object_id

  if conn = @allocated[id]
    skip_release = true
  else
    conn = acquire(fiber) until conn
  end

  begin
    yield conn if block_given?

  rescue PG::Error
    if conn.status != PG::CONNECTION_OK
      conn.finish unless conn.finished?
      drop_failed(id)
      skip_release = true
    end
    raise
  ensure
    release(id) unless skip_release
  end
end
Also aliased as: execute
method_missing(*a, &b) click to toggle source
# File lib/pg/em/connection_pool.rb, line 341
def method_missing(*a, &b)
  hold { |c| c.__send__(*a, &b) }
end
respond_to_missing?(m, priv = false) click to toggle source
# File lib/pg/em/connection_pool.rb, line 345
def respond_to_missing?(m, priv = false)
  hold { |c| c.respond_to?(m, priv) }
end
size() click to toggle source

Current number of connections in the connection pool

@return [Integer]

# File lib/pg/em/connection_pool.rb, line 163
def size
  @available.length + @allocated.length
end
transaction(&blk) click to toggle source

Executes a BEGIN at the start of the block and a COMMIT at the end of the block or ROLLBACK if any exception occurs. Calls to transaction may be nested, however without sub-transactions (save points).

@example Transactions

pg = PG::EM::ConnectionPool.new size: 10
pg.transaction do
  pg.exec('insert into animals (family, species) values ($1,$2)',
          [family, species])
  num = pg.query('select count(*) from people where family=$1',
          [family]).get_value(0,0)
  pg.exec('update stats set count = $1 where family=$2',
          [num, family])
end

@see Client#transaction @see hold

# File lib/pg/em/connection_pool.rb, line 298
def transaction(&blk)
  hold do |pg|
    pg.transaction(&blk)
  end
end

Private Instance Methods

acquire(fiber) click to toggle source

Get available connection or create a new one, or put on hold @return [Client] on success @return [nil] when dropped connection creates a free slot

# File lib/pg/em/connection_pool.rb, line 354
def acquire(fiber)
  if conn = @available.pop
    @allocated[fiber.object_id] = conn
  else
    if size < max_size
      begin
        id = fiber.object_id
        # mark allocated pool for proper #size value
        # the connection is made asynchronously
        @allocated[id] = opts = DeferredOptions.new
        conn = @connection_class.new(@options)
      ensure
        if conn
          opts.apply conn
          @allocated[id] = conn
        else
          drop_failed(id)
        end
      end
    else
      @pending << fiber
      Fiber.yield
    end
  end
end
acquire_deferred(df, &blk) click to toggle source

Asynchronously create a new connection or get the released one

@param df [EM::Deferrable] - the acquiring object and the one to fail

when establishing connection fails

@return [EM::Deferrable] the deferrable that will succeed with either

new or released connection
# File lib/pg/em/connection_pool.rb, line 428
def acquire_deferred(df, &blk)
  id = df.object_id
  if size < max_size
    # mark allocated pool for proper #size value
    # the connection is made asynchronously
    @allocated[id] = opts = DeferredOptions.new
    @connection_class.connect_defer(@options).callback {|conn|
      opts.apply conn
    }.errback do |err|
      drop_failed(id)
      df.fail(err)
    end
  else
    @pending << (conn_df = ::EM::DefaultDeferrable.new)
    conn_df.errback do
      # a dropped connection made a free slot
      acquire_deferred(df, &blk)
    end
  end.callback(&blk)
end
drop_failed(id) click to toggle source

drop a failed connection (or a mark) from the pool and ensure that the pending requests won’t starve

# File lib/pg/em/connection_pool.rb, line 451
def drop_failed(id)
  @allocated.delete(id)
  if pending = @pending.shift
    if pending.is_a?(Fiber)
      pending.resume
    else
      pending.fail
    end
  end
end
hold_deferred(blk = nil) { |conn| ... } click to toggle source

Asynchronously acquires {Client} connection and passes it to the given block on success.

The block will receive the acquired connection as its argument and should return a deferrable object which is either returned from this method or is being status-bound to another deferrable returned from this method.

@param blk [Proc] optional block passed to callback and errback

of the returned deferrable object

@yieldparam pg [Client] a connected client instance @yieldreturn [EM::Deferrable] @return [EM::Deferrable]

# File lib/pg/em/connection_pool.rb, line 393
def hold_deferred(blk = nil)
  if conn = @available.pop
    id = conn.object_id
    @allocated[id] = conn
    df = yield conn
  else
    df = FeaturedDeferrable.new
    id = df.object_id
    acquire_deferred(df) do |nc|
      @allocated[id] = conn = nc
      df.bind_status yield conn
    end
  end
  df.callback { release(id) }
  df.errback do |err|
    if conn
      if err.is_a?(PG::Error) &&
          conn.status != PG::CONNECTION_OK
        conn.finish unless conn.finished?
        drop_failed(id)
      else
        release(id)
      end
    end
  end
  df.completion(&blk) if blk
  df
end
release(id) click to toggle source

release connection and pass it to the next pending request or back to the free pool

# File lib/pg/em/connection_pool.rb, line 464
def release(id)
  conn = @allocated.delete(id)
  if pending = @pending.shift
    if pending.is_a?(Fiber)
      @allocated[pending.object_id] = conn
      pending.resume conn
    else
      pending.succeed conn
    end
  else
    @available << conn
  end
end