class PG::EM::Client

PostgreSQL EventMachine client

Author

Rafal Michalski

{PG::EM::Client} is a PG::Connection wrapper designed for EventMachine.

The following new methods:

are added to execute queries asynchronously, returning Deferrable object.

The following methods of PG::Connection are overloaded:

and are now auto-detecting if EventMachine is running and performing commands asynchronously (blocking only current fiber) or calling parent thread-blocking methods.

If {#async_autoreconnect} option is set to true, all of the above methods (in asynchronous mode) try to re-connect after a connection error occurs. It’s performed behind the scenes, so no error is raised, except if there was a transaction in progress. In such instance the error is raised after establishing connection to signal that the transaction was aborted.

If you want to detect auto re-connect event use {#on_autoreconnect} property/option.

To enable auto-reconnecting set:

client.async_autoreconnect = true

or pass as {new} hash argument:

PG::EM::Client.new dbname: 'bar', async_autoreconnect: true

There are also new methods:

which are asynchronous versions of PG::Connection.new and PG:Connection#reset.

Additionally the following methods are overloaded:

providing auto-detecting asynchronous (fiber-synchronized) or thread-blocking methods for (re)connecting.

Otherwise nothing changes in PG::Connection API. See PG::Connection docs for explanation of arguments to the above methods.

Warning:

{#describe_prepared} and {#exec_prepared} after {#prepare} should only be invoked on the same connection. If you are using a {ConnectionPool}, make sure to acquire a single connection first or execute prepare command on every connection using #on_connect hook.

Constants

DEFAULT_ASYNC_VARS
TRAN_BEGIN_QUERY

@!endgroup

TRAN_COMMIT_QUERY
TRAN_ROLLBACK_QUERY

Attributes

async_autoreconnect[RW]

@!attribute async_autoreconnect

@return [Boolean] asynchronous auto re-connect status
Enable/disable auto re-connect feature (+true+/+false+).
Defaults to +false+ unless {#on_autoreconnect} is specified
in a +connection_hash+.

Changing {#on_autoreconnect} with accessor method doesn't change
the state of {#async_autoreconnect}.

You may set +:async_autoreconnect+ in a +connection_hash+ argument
passed to {new} or {connect_defer}.
async_command_aborted[RW]

@!visibility private Used internally for marking connection as aborted on query timeout.

connect_timeout[RW]

@!attribute connect_timeout

@return [Float] connection timeout in seconds
Connection timeout. Affects {#reset} and {#reset_defer}.

Changing this property does not affect thread-blocking {#reset} unless
passed as a +connection_hash+.

To enable it set to some positive value. To disable it: set to 0.

You may set +:connect_timeout+ in a +connection_hash+ argument
passed to {new} or {connect_defer}.
on_autoreconnect[W]

@!attribute on_autoreconnect

@return [Proc<Client, Error>] auto re-connect hook
A proc like object that is being called after a connection with the
server has been automatically re-established. It's being invoked
just before the pending command is sent to the server.

@yieldparam pg [Client] re-connected client instance
@yieldparam error [Exception] an error after which the auto re-connect
                           process began.
@yieldreturn [false|true|Exception|EM::Deferrable|*]

The first argument it receives is the connected {Client} instance.
The second is the original +error+ that caused the reconnecting
process.

It's possible to execute queries from the +on_autoreconnect+ hook.
Code is being executed in a fiber context, so both deferrable and
fiber-synchronized query commands may be used.

If exception is raised during execution of the +on_autoreconnect+
hook the reset operation will fail with that exception.

The hook can control later actions with its return value:

- +false+ (explicitly, +nil+ is ignored) - the original +exception+
  is raised/passed back and the pending query command is not sent
  again to the server.
- +true+ (explicitly, truish values are ignored), the pending command
  is called regardless of the connection's last transaction status.
- +Exception+ object - is raised/passed back and the pending command
  is not sent.
- +Deferrable+ object - the chosen action will depend on the returned
  deferrable status.
- Other values are ignored and the pending query command is
  immediately sent to the server unless there was a transaction in
  progress before the connection was reset.

If both +on_connect+ and +on_autoreconnect+ hooks are set,
the +on_connect+ is being called first and +on_autoreconnect+ is
called only when +on_connect+ succeeds.

You may set +:on_autoreconnect+ hook in a +connection_hash+ argument
passed to {new} or {connect_defer}.

@example How to use deferrable in on_autoreconnect hook
  pg.on_autoreconnect do |pg, e|
    logger.warn "PG connection was reset: #{e.inspect}, delaying 1 sec."
    EM::DefaultDeferrable.new.tap do |df|
      EM.add_timer(1) { df.succeed }
    end
  end
on_connect[W]

@!attribute on_connect

@return [Proc<Client,is_async,is_reset>] connect hook
A proc like object that is being called after a connection with
the server has been established.

@yieldparam pg [Client] connected client instance
@yieldparam is_async [Boolean] flag indicating if the connection
            was established asynchronously
@yieldparam is_reset [Boolean] flag indicating if the connection
            client was reset
@yieldreturn [EM::Deferrable|*]

The first argument it receives is the connected {Client} instance.
The second argument is +true+ if the connection was established in
asynchronous manner, +false+ otherwise.
The third argument is +true+ when the connection has been reset or
+false+ on new connection.

It's possible to execute queries from the +on_connect+ hook.
Code is being executed in a fiber context, so both deferrable and
fiber-synchronized query commands may be used.
However deferrable commands will work only if eventmachine reactor
is running, so check if +is_async+ is +true+.

If exception is raised during execution of the +on_connect+ hook
the connecting/reset operation will fail with that exception.

The hook can control later actions with its return value:

- +Deferrable+ object - the connection establishing status will depend
  on the returned deferrable status (only in asynchronous mode).
- Other values are ignored.

If both +on_connect+ and +on_autoreconnect+ hooks are set,
the +on_connect+ is being called first and +on_autoreconnect+ is
called only when +on_connect+ succeeds.

You may set +:on_connect+ hook in a +connection_hash+ argument
passed to {new} or {connect_defer}.

@example How to use prepare in on_connect hook
  PG::EM::Client.new(on_connect: proc {|pg|
    pg.prepare("species_by_name",
     "select id, name from animals where species=$1 order by name")
  })
query_timeout[RW]

@!attribute query_timeout

@return [Float] query timeout in seconds
Aborts async command processing if server response time
exceedes +query_timeout+ seconds. This does not apply to
{#reset} and {#reset_defer}.

To enable it set to some positive value. To disable it: set to 0.

You may set +:query_timeout+ in a +connection_hash+ argument
passed to {new} or {connect_defer}.

Public Class Methods

async_connect(*args, &blk)

@deprecated Use {connect_defer} instead.

Alias for: connect_defer
connect(*args, &blk)
Alias for: new
connect_defer(*args, &blk) click to toggle source

Attempts to establish the connection asynchronously.

@return [FeaturedDeferrable] @yieldparam pg [Client|PG::Error] new and connected client instance on

success or an instance of raised PG::Error

Pass the block to the returned deferrable’s callback to obtain newly created and already connected {Client} object. In case of connection error errback hook receives an error object as an argument. If the block is provided it’s bound to both callback and errback hooks of the returned deferrable.

Special {Client} options (e.g.: {#async_autoreconnect}) must be provided in a connection_hash argument variant. They will be ignored if passed in a connection_string.

client_encoding will be set according to Encoding.default_internal.

@see deveiate.org/code/pg/PG/Connection.html#method-c-new PG::Connection.new

# File lib/pg/em.rb, line 354
def self.connect_defer(*args, &blk)
  df = FeaturedDeferrable.new(&blk)
  async_args = parse_async_options(args)
  conn = df.protect { connect_start(*args) }
  if conn
    async_args.each {|k, v| conn.instance_variable_set(k, v) }
    ::EM.watch(conn.socket_io, ConnectWatcher, conn, df, false).
      poll_connection_and_check
  end
  df
end
Also aliased as: async_connect
new(*args) { |conn| ... } click to toggle source

Creates new instance of PG::EM::Client and attempts to establish connection.

Performs command asynchronously yielding from current fiber if EventMachine reactor is running and current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.

Otherwise performs a thread-blocking call to the parent method.

@raise [PG::Error]

Special {Client} options (e.g.: {#async_autoreconnect}) must be provided in a connection_hash argument variant. They will be ignored if passed in a connection_string.

client_encoding will be set according to Encoding.default_internal.

@see deveiate.org/code/pg/PG/Connection.html#method-c-new PG::Connection.new

Calls superclass method
# File lib/pg/em.rb, line 456
def self.new(*args, &blk)
  if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
    connect_defer(*args) {|r| f.resume(r) }

    conn = Fiber.yield
    raise conn if conn.is_a?(::Exception)
    if block_given?
      begin
        yield conn
      ensure
        conn.finish
      end
    else
      conn
    end
  else
    conn = super(*args)
    if on_connect = conn.on_connect
      on_connect.call(conn, false, false)
    end
    conn
  end
end
Also aliased as: connect, open, setdb, setdblogin
new(*args) click to toggle source

@!visibility private

Calls superclass method
# File lib/pg/em.rb, line 481
def initialize(*args)
  Client.parse_async_options(args).each {|k, v| instance_variable_set(k, v) }
  super(*args)
end
open(*args, &blk)
Alias for: new
parse_async_options(args) click to toggle source

@!visibility private

# File lib/pg/em.rb, line 294
def self.parse_async_options(args)
  options = DEFAULT_ASYNC_VARS.dup
  if args.last.is_a? Hash
    args[-1] = args.last.reject do |key, value|
      case key.to_sym
      when :async_autoreconnect
        options[:@async_autoreconnect] = value
        true
      when :on_connect
        if value.respond_to? :call
          options[:@on_connect] = value
        else
          raise ArgumentError, "on_connect must respond to `call'"
        end
        true
      when :on_reconnect
        raise ArgumentError, "on_reconnect is no longer supported, use on_autoreconnect"
      when :on_autoreconnect
        if value.respond_to? :call
          options[:@on_autoreconnect] = value
          options[:@async_autoreconnect] = true if options[:@async_autoreconnect].nil?
        else
          raise ArgumentError, "on_autoreconnect must respond to `call'"
        end
        true
      when :connect_timeout
        options[:@connect_timeout] = value.to_f
        false
      when :query_timeout
        options[:@query_timeout] = value.to_f
        true
      end
    end
  end
  options[:@async_autoreconnect] = !!options[:@async_autoreconnect]
  options[:@connect_timeout] ||= ENV[@@connect_timeout_envvar].to_f
  options
end
setdb(*args, &blk)
Alias for: new
setdblogin(*args, &blk)
Alias for: new
single_row_mode?() click to toggle source

Returns true if pg supports single row mode or false otherwise. Single row mode is available since libpq 9.2. @return [Boolean] @see deveiate.org/code/pg/PG/Connection.html#method-i-set_single_row_mode PG::Connection#set_single_row_mode

# File lib/pg/em.rb, line 270
def self.single_row_mode?
  method_defined? :set_single_row_mode
end

Public Instance Methods

async_autoreconnect!(deferrable, error, send_proc = nil) { || ... } click to toggle source

@!visibility private Perform auto re-connect. Used internally.

# File lib/pg/em.rb, line 526
def async_autoreconnect!(deferrable, error, send_proc = nil, &on_connection_bad)
  # reconnect only if connection is bad and flag is set
  if self.status == CONNECTION_BAD

    yield if block_given?

    if async_autoreconnect
      # check if transaction was active
      was_in_transaction = case @last_transaction_status
      when PQTRANS_IDLE, PQTRANS_UNKNOWN
        false
      else
        true
      end
      # reset asynchronously
      reset_df = reset_defer
      # just fail on reset failure
      reset_df.errback { |ex| deferrable.fail ex }
      # reset succeeds
      reset_df.callback do
        # handle on_autoreconnect
        if on_autoreconnect
          # wrap in a fiber, so on_autoreconnect code may yield from it
          Fiber.new do
            # call on_autoreconnect handler and fail if it raises an error
            returned_df = begin
              on_autoreconnect.call(self, error)
            rescue => ex
              ex
            end
            if returned_df.respond_to?(:callback) && returned_df.respond_to?(:errback)
              # the handler returned a deferrable
              returned_df.callback do
                if was_in_transaction || !send_proc
                  # fail anyway, there was a transaction in progress or in single result mode
                  deferrable.fail error
                else
                  # try to call failed query command again
                  deferrable.protect(&send_proc)
                end
              end
              # fail when handler's deferrable fails
              returned_df.errback { |ex| deferrable.fail ex }
            elsif returned_df.is_a?(Exception)
              # tha handler returned an exception object, so fail with it
              deferrable.fail returned_df
            elsif returned_df == false || !send_proc || (was_in_transaction && returned_df != true)
              # tha handler returned false or in single result mode
              # or there was an active transaction and handler didn't return true
              deferrable.fail error
            else
              # try to call failed query command again
              deferrable.protect(&send_proc)
            end
          end.resume
        elsif was_in_transaction || !send_proc
          # there was a transaction in progress or in single result mode;
          # fail anyway
          deferrable.fail error
        else
          # no on_autoreconnect handler, no transaction
          # try to call failed query command again
          deferrable.protect(&send_proc)
        end
      end
      # connection is bad, reset in progress, all done
      return
    end
  end
  # connection is either good or bad, the async_autoreconnect is not set
  deferrable.fail error
end
async_reset(&blk)

@deprecated Use {reset_defer} instead.

Alias for: reset_defer
blocking_get_result()
Alias for: get_result
blocking_wait_for_notify(timeout = nil)

@!endgroup

Alias for: wait_for_notify
close()
Alias for: finish
finish() click to toggle source

Closes the backend connection.

Detaches watch handler to prevent memory leak after calling parent PG::Connection#finish. @see deveiate.org/code/pg/PG/Connection.html#method-i-finish PG::Connection#finish

Calls superclass method
# File lib/pg/em.rb, line 500
def finish
  super
  if @watcher
    @watcher.detach if @watcher.watching?
    @watcher = nil
  end
end
Also aliased as: close
get_last_result_defer(&blk) click to toggle source

Asynchronously retrieves all available results on the current connection (from previously issued asynchronous commands like +send_query()+) and immediately returns with a Deferrable. It then receives the last non-NULL result on :succeed, or nil if no results are available.

@macro deferrable_api @yieldparam result [PG::Result|Error|nil] command result on success or a PG::Error instance on error

or +nil+ if no results are available.

@see deveiate.org/code/pg/PG/Connection.html#method-i-send_query PG::Connection#send_query @see deveiate.org/code/pg/PG/Connection.html#method-i-get_last_result PG::Connection#get_last_result

# File lib/pg/em.rb, line 775
def get_last_result_defer(&blk)
  df = FeaturedDeferrable.new
  begin
    if status == CONNECTION_OK
      setup_emio_watcher.watch_results(df)
    else
      df.succeed
    end
  rescue Error => e
    ::EM.next_tick { async_autoreconnect!(df, e) }
  rescue Exception => e
    ::EM.next_tick { df.fail(e) }
  end
  df.completion(&blk) if block_given?
  df
end
get_result() { |result| ... } click to toggle source

Retrieves the next result from a call to send_query (or another asynchronous command). If no more results are available returns nil and the block (if given) is never called.

@macro auto_synchrony_api @return [nil] if no more results

@see get_result_defer @see deveiate.org/code/pg/PG/Connection.html#method-i-get_result PG::Connection#get_result

Calls superclass method
# File lib/pg/em.rb, line 990
def get_result
  if is_busy && ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
    if (result = fiber_sync get_result_defer, f).is_a?(::Exception)
      raise result
    end
    if block_given? && result
      begin
        yield result
      ensure
        result.clear
      end
    else
      result
    end
  else
    super
  end
end
Also aliased as: blocking_get_result
get_result_defer(&blk) click to toggle source

Asynchronously retrieves the next result from a call to send_query (or another asynchronous command) and immediately returns with a Deferrable. It then receives the result object on :succeed, or nil if no results are available.

@macro deferrable_api @yieldparam result [PG::Result|Error|nil] command result on success or a PG::Error instance on error

or +nil+ if no results are available.

@see deveiate.org/code/pg/PG/Connection.html#method-i-send_query PG::Connection#send_query @see deveiate.org/code/pg/PG/Connection.html#method-i-get_result PG::Connection#get_result

# File lib/pg/em.rb, line 741
def get_result_defer(&blk)
  df = FeaturedDeferrable.new
  begin
    if status == CONNECTION_OK
      if is_busy
        setup_emio_watcher.watch_results(df, nil, true)
      else
        df.succeed blocking_get_result
      end
    else
      df.succeed
    end
  rescue Error => e
    ::EM.next_tick { async_autoreconnect!(df, e) }
  rescue Exception => e
    ::EM.next_tick { df.fail(e) }
  end
  df.completion(&blk) if block_given?
  df
end
notifies_wait(timeout = nil)
Alias for: wait_for_notify
notifies_wait_defer(timeout = nil, &blk)
on_autoreconnect(&hook) click to toggle source
# File lib/pg/em.rb, line 198
def on_autoreconnect(&hook)
  if block_given?
    @on_autoreconnect = hook
  else
    @on_autoreconnect
  end
end
on_connect(&hook) click to toggle source
# File lib/pg/em.rb, line 254
def on_connect(&hook)
  if block_given?
    @on_connect = hook
  else
    @on_connect
  end
end
raise_error(klass=Error, message=error_message) click to toggle source
# File lib/pg/em.rb, line 797
def raise_error(klass=Error, message=error_message)
  error = klass.new(message)
  error.instance_variable_set(:@connection, self)
  raise error
end
reset() click to toggle source

Attempts to reset the connection.

Performs command asynchronously yielding from current fiber if EventMachine reactor is running and current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.

Otherwise performs a thread-blocking call to the parent method.

@raise [PG::Error] @see reset_defer @see deveiate.org/code/pg/PG/Connection.html#method-i-reset PG::Connection#reset

Calls superclass method
# File lib/pg/em.rb, line 418
def reset
  if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
    reset_defer {|r| f.resume(r) }

    conn = Fiber.yield
    raise conn if conn.is_a?(::Exception)
    conn
  else
    @async_command_aborted = false
    if @watcher
      @watcher.detach if @watcher.watching?
      @watcher = nil
    end
    super
    @on_connect.call(self, false, true) if @on_connect
    self
  end
end
reset_defer(&blk) click to toggle source

Attempts to reset the connection asynchronously.

@return [FeaturedDeferrable] @yieldparam pg [Client|PG::Error] reconnected client instance on

success or an instance of raised PG::Error

Pass the block to the returned deferrable’s callback to execute after successfull reset. If the block is provided it’s bound to callback and errback hooks of the returned deferrable. @see deveiate.org/code/pg/PG/Connection.html#method-i-reset PG::Connection#reset

# File lib/pg/em.rb, line 382
def reset_defer(&blk)
  @async_command_aborted = false
  df = FeaturedDeferrable.new(&blk)
  # there can be only one watch handler over the socket
  # apparently eventmachine has hard time dealing with more than one
  if @watcher
    @watcher.detach if @watcher.watching?
    @watcher = nil
  end
  ret = df.protect(:fail) { reset_start }
  unless ret == :fail
    ::EM.watch(self.socket_io, ConnectWatcher, self, df, true).
      poll_connection_and_check
  end
  df
end
Also aliased as: async_reset
single_row_mode?() click to toggle source

Returns true if pg supports single row mode or false otherwise. @return [Boolean] @see single_row_mode?

# File lib/pg/em.rb, line 277
def single_row_mode?
  self.class.single_row_mode?
end
status() click to toggle source

Returns status of connection: PG::CONNECTION_OK or PG::CONNECTION_BAD.

@return [Number] Returns PG::CONNECTION_BAD for connections with async_command_aborted flag set by expired query timeout. Otherwise return whatever PG::Connection#status returns. @see deveiate.org/code/pg/PG/Connection.html#method-i-status PG::Connection#status

Calls superclass method
# File lib/pg/em.rb, line 516
def status
  if @async_command_aborted
    CONNECTION_BAD
  else
    super
  end
end
transaction() { |self| ... } click to toggle source

Executes a BEGIN at the start of the block and a COMMIT at the end of the block or ROLLBACK if any exception occurs.

@note Avoid using PG::EM::Client#*_defer calls inside the block or make sure

all queries are completed before the provided block terminates.

@return [Object] result of the block @yieldparam client [self] @see deveiate.org/code/pg/PG/Connection.html#method-i-transaction PG::Connection#transaction

Calls to {#transaction} may be nested, however without sub-transactions (save points). If the innermost transaction block raises an error the transaction is rolled back to the state before the outermost transaction began.

This is an extension to the +PG::Connection#transaction+ method as it does not support nesting in this way.

The method is sensitive to the transaction status and will safely rollback on any sql error even when it was catched by some rescue block. But consider that rescuing any sql error within an utility method is a bad idea.

This method works in both blocking/async modes (regardles of the reactor state) and is considered as a generic extension to the +PG::Connection#transaction+ method.

@example Nested transaction example

def add_comment(user_id, text)
  db.transaction do
    cmt_id = db.query(
      'insert into comments (text) where user_id=$1 values ($2) returning id',
      [user_id, text]).getvalue(0,0)
    db.query(
      'update users set last_comment_id=$2 where id=$1', [user_id, cmt_id])
    cmt_id
  end
end

def update_comment_count(page_id)
  db.transaction do
    count = db.query('select count(*) from comments where page_id=$1', [page_id]).getvalue(0,0)
    db.query('update pages set comment_count=$2 where id=$1', [page_id, count])
  end
end

# to run add_comment and update_comment_count within the same transaction
db.transaction do
  add_comment(user_id, some_text)
  update_comment_count(page_id)
end
# File lib/pg/em.rb, line 1067
def transaction
  raise ArgumentError, 'Must supply block for PG::EM::Client#transaction' unless block_given?
  tcount = @client_tran_count.to_i

  case transaction_status
  when PQTRANS_IDLE
    # there is no transaction yet, so let's begin
    exec(TRAN_BEGIN_QUERY)
    # reset transaction count in case user code rolled it back before
    tcount = 0 if tcount != 0
  when PQTRANS_INTRANS
    # transaction in progress, leave it be
  else
    # transaction failed, is in unknown state or command is active
    # in any case calling begin will raise server transaction error
    exec(TRAN_BEGIN_QUERY) # raises PG::InFailedSqlTransaction
  end
  # memoize nested count
  @client_tran_count = tcount + 1
  begin

    result = yield self

  rescue
    # error was raised
    case transaction_status
    when PQTRANS_INTRANS, PQTRANS_INERROR
      # do not rollback if transaction was rolled back before
      # or is in unknown state, which means connection reset is needed
      # and rollback only from the outermost transaction block
      exec(TRAN_ROLLBACK_QUERY) if tcount.zero?
    end
    # raise again
    raise
  else
    # we are good (but not out of woods yet)
    case transaction_status
    when PQTRANS_INTRANS
      # commit only from the outermost transaction block
      exec(TRAN_COMMIT_QUERY) if tcount.zero?
    when PQTRANS_INERROR
      # no ruby error was raised (or an error was rescued in code block)
      # but there was an sql error anyway
      # so rollback after the outermost block
      exec(TRAN_ROLLBACK_QUERY) if tcount.zero?
    when PQTRANS_IDLE
      # the code block has terminated the transaction on its own
      # so just reset the counter
      tcount = 0
    else
      # something isn't right, so provoke an error just in case
      exec(TRAN_ROLLBACK_QUERY) if tcount.zero?
    end
    result
  ensure
    @client_tran_count = tcount
  end
end
wait_for_notify(timeout = nil) { |values_at(:relname, :be_pid, :extra)| ... } click to toggle source

Blocks while waiting for notification(s), or until the optional timeout is reached, whichever comes first. Returns nil if timeout is reached, the name of the NOTIFY event otherwise.

If EventMachine reactor is running and the current fiber isn’t the root fiber this method performs command asynchronously yielding current fiber. Other fibers can process while the current one is waiting for notifications.

Otherwise performs a blocking call to a parent method. @return [String|nil] @yieldparam name [String] the name of the NOTIFY event @yieldparam pid [Number] the generating pid @yieldparam payload [String] the optional payload @raise [PG::Error]

@see deveiate.org/code/pg/PG/Connection.html#method-i-wait_for_notify PG::Connection#wait_for_notify

Calls superclass method
# File lib/pg/em.rb, line 961
def wait_for_notify(timeout = nil)
  if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
    unless notify_hash = notifies
      if (notify_hash = fiber_sync wait_for_notify_defer(timeout), f).is_a?(::Exception)
        raise notify_hash
      end
    end
    if notify_hash
      if block_given?
        yield notify_hash.values_at(:relname, :be_pid, :extra)
      end
      notify_hash[:relname]
    end
  else
    super
  end
end
wait_for_notify_defer(timeout = nil, &blk) click to toggle source

Asynchronously waits for notification or until the optional timeout is reached, whichever comes first. timeout is measured in seconds and can be fractional. Returns immediately with a Deferrable.

Pass the block to the returned deferrable’s callback to obtain notification hash. In case of connection error errback hook is called with an error object. If the timeout is reached nil is passed to deferrable’s callback. If the block is provided it’s bound to both the callback and errback hooks of the returned deferrable. If another call is made to this method before the notification is received (or before reaching timeout) the previous deferrable’s errback will be called with nil argument.

@return [FeaturedDeferrable] @yieldparam notification [Hash|nil|Error] notification hash or a PG::Error instance on error

or nil when timeout is reached or canceled.

@see deveiate.org/code/pg/PG/Connection.html#method-i-notifies PG::Connection#notifies

# File lib/pg/em.rb, line 708
def wait_for_notify_defer(timeout = nil, &blk)
  df = FeaturedDeferrable.new
  begin
    check_async_command_aborted!
    if status == CONNECTION_OK
      setup_emio_watcher.watch_notify(df, timeout)
    else
      raise_error ConnectionBad
    end
  rescue Error => e
    ::EM.next_tick { async_autoreconnect!(df, e) }
  rescue Exception => e
    ::EM.next_tick { df.fail(e) }
  end
  df.completion(&blk) if block_given?
  df
end
Also aliased as: notifies_wait_defer

Private Instance Methods

check_async_command_aborted!() click to toggle source
# File lib/pg/em.rb, line 814
def check_async_command_aborted!
  if @async_command_aborted
    raise_error ConnectionBad, "previous query expired, need connection reset"
  end
end
fiber_sync(df, fiber) click to toggle source
# File lib/pg/em.rb, line 805
def fiber_sync(df, fiber)
  f = nil
  df.completion do |res|
    if f then f.resume res else return res end
  end
  f = fiber
  Fiber.yield
end
setup_emio_watcher() click to toggle source
# File lib/pg/em.rb, line 820
def setup_emio_watcher
  if @watcher && @watcher.watching?
    @watcher
  else
    @watcher = ::EM.watch(self.socket_io, Watcher, self)
  end
end