class PG::EM::Client
PostgreSQL EventMachine client¶ ↑
- Author
-
Rafal Michalski
{PG::EM::Client} is a PG::Connection wrapper designed for EventMachine.
The following new methods:
-
{#exec_defer} (alias:
query_defer
) -
{#exec_params_defer}
-
{#prepare_defer}
-
{#exec_prepared_defer}
-
{#describe_prepared_defer}
-
{#describe_portal_defer}
-
{#get_result_defer}
-
{#get_last_result_defer}
are added to execute queries asynchronously, returning Deferrable
object.
The following methods of PG::Connection are overloaded:
-
{#exec} (alias:
query
,async_exec
,async_query
) -
{#exec_params}
-
{#prepare}
-
{#exec_prepared}
-
{#describe_prepared}
-
{#describe_portal}
-
{#get_result}
-
{#get_last_result}
and are now auto-detecting if EventMachine is running and performing commands asynchronously (blocking only current fiber) or calling parent thread-blocking methods.
If {#async_autoreconnect} option is set to true
, all of the above methods (in asynchronous mode) try to re-connect after a connection error occurs. It’s performed behind the scenes, so no error is raised, except if there was a transaction in progress. In such instance the error is raised after establishing connection to signal that the transaction was aborted.
If you want to detect auto re-connect event use {#on_autoreconnect} property/option.
To enable auto-reconnecting set:
client.async_autoreconnect = true
or pass as {new} hash argument:
PG::EM::Client.new dbname: 'bar', async_autoreconnect: true
There are also new methods:
-
{Client.connect_defer}
-
{#reset_defer}
which are asynchronous versions of PG::Connection.new and PG:Connection#reset.
Additionally the following methods are overloaded:
-
{new} (alias:
connect
,open
,setdb
,setdblogin
) -
{#reset}
providing auto-detecting asynchronous (fiber-synchronized) or thread-blocking methods for (re)connecting.
Otherwise nothing changes in PG::Connection API. See PG::Connection docs for explanation of arguments to the above methods.
Warning:
{#describe_prepared} and {#exec_prepared} after {#prepare} should only be invoked on the same connection. If you are using a {ConnectionPool}, make sure to acquire a single connection first or execute prepare
command on every connection using #on_connect
hook.
Constants
- DEFAULT_ASYNC_VARS
- TRAN_BEGIN_QUERY
@!endgroup
- TRAN_COMMIT_QUERY
- TRAN_ROLLBACK_QUERY
Attributes
@!attribute async_autoreconnect
@return [Boolean] asynchronous auto re-connect status Enable/disable auto re-connect feature (+true+/+false+). Defaults to +false+ unless {#on_autoreconnect} is specified in a +connection_hash+. Changing {#on_autoreconnect} with accessor method doesn't change the state of {#async_autoreconnect}. You may set +:async_autoreconnect+ in a +connection_hash+ argument passed to {new} or {connect_defer}.
@!visibility private Used internally for marking connection as aborted on query timeout.
@!attribute connect_timeout
@return [Float] connection timeout in seconds Connection timeout. Affects {#reset} and {#reset_defer}. Changing this property does not affect thread-blocking {#reset} unless passed as a +connection_hash+. To enable it set to some positive value. To disable it: set to 0. You may set +:connect_timeout+ in a +connection_hash+ argument passed to {new} or {connect_defer}.
@!attribute on_autoreconnect
@return [Proc<Client, Error>] auto re-connect hook A proc like object that is being called after a connection with the server has been automatically re-established. It's being invoked just before the pending command is sent to the server. @yieldparam pg [Client] re-connected client instance @yieldparam error [Exception] an error after which the auto re-connect process began. @yieldreturn [false|true|Exception|EM::Deferrable|*] The first argument it receives is the connected {Client} instance. The second is the original +error+ that caused the reconnecting process. It's possible to execute queries from the +on_autoreconnect+ hook. Code is being executed in a fiber context, so both deferrable and fiber-synchronized query commands may be used. If exception is raised during execution of the +on_autoreconnect+ hook the reset operation will fail with that exception. The hook can control later actions with its return value: - +false+ (explicitly, +nil+ is ignored) - the original +exception+ is raised/passed back and the pending query command is not sent again to the server. - +true+ (explicitly, truish values are ignored), the pending command is called regardless of the connection's last transaction status. - +Exception+ object - is raised/passed back and the pending command is not sent. - +Deferrable+ object - the chosen action will depend on the returned deferrable status. - Other values are ignored and the pending query command is immediately sent to the server unless there was a transaction in progress before the connection was reset. If both +on_connect+ and +on_autoreconnect+ hooks are set, the +on_connect+ is being called first and +on_autoreconnect+ is called only when +on_connect+ succeeds. You may set +:on_autoreconnect+ hook in a +connection_hash+ argument passed to {new} or {connect_defer}. @example How to use deferrable in on_autoreconnect hook pg.on_autoreconnect do |pg, e| logger.warn "PG connection was reset: #{e.inspect}, delaying 1 sec." EM::DefaultDeferrable.new.tap do |df| EM.add_timer(1) { df.succeed } end end
@!attribute on_connect
@return [Proc<Client,is_async,is_reset>] connect hook A proc like object that is being called after a connection with the server has been established. @yieldparam pg [Client] connected client instance @yieldparam is_async [Boolean] flag indicating if the connection was established asynchronously @yieldparam is_reset [Boolean] flag indicating if the connection client was reset @yieldreturn [EM::Deferrable|*] The first argument it receives is the connected {Client} instance. The second argument is +true+ if the connection was established in asynchronous manner, +false+ otherwise. The third argument is +true+ when the connection has been reset or +false+ on new connection. It's possible to execute queries from the +on_connect+ hook. Code is being executed in a fiber context, so both deferrable and fiber-synchronized query commands may be used. However deferrable commands will work only if eventmachine reactor is running, so check if +is_async+ is +true+. If exception is raised during execution of the +on_connect+ hook the connecting/reset operation will fail with that exception. The hook can control later actions with its return value: - +Deferrable+ object - the connection establishing status will depend on the returned deferrable status (only in asynchronous mode). - Other values are ignored. If both +on_connect+ and +on_autoreconnect+ hooks are set, the +on_connect+ is being called first and +on_autoreconnect+ is called only when +on_connect+ succeeds. You may set +:on_connect+ hook in a +connection_hash+ argument passed to {new} or {connect_defer}. @example How to use prepare in on_connect hook PG::EM::Client.new(on_connect: proc {|pg| pg.prepare("species_by_name", "select id, name from animals where species=$1 order by name") })
@!attribute query_timeout
@return [Float] query timeout in seconds Aborts async command processing if server response time exceedes +query_timeout+ seconds. This does not apply to {#reset} and {#reset_defer}. To enable it set to some positive value. To disable it: set to 0. You may set +:query_timeout+ in a +connection_hash+ argument passed to {new} or {connect_defer}.
Public Class Methods
Attempts to establish the connection asynchronously.
@return [FeaturedDeferrable] @yieldparam pg [Client|PG::Error] new and connected client instance on
success or an instance of raised PG::Error
Pass the block to the returned deferrable’s callback
to obtain newly created and already connected {Client} object. In case of connection error errback
hook receives an error object as an argument. If the block is provided it’s bound to both callback
and errback
hooks of the returned deferrable.
Special {Client} options (e.g.: {#async_autoreconnect}) must be provided in a connection_hash
argument variant. They will be ignored if passed in a connection_string
.
client_encoding
will be set according to Encoding.default_internal
.
@see deveiate.org/code/pg/PG/Connection.html#method-c-new PG::Connection.new
# File lib/pg/em.rb, line 354 def self.connect_defer(*args, &blk) df = FeaturedDeferrable.new(&blk) async_args = parse_async_options(args) conn = df.protect { connect_start(*args) } if conn async_args.each {|k, v| conn.instance_variable_set(k, v) } ::EM.watch(conn.socket_io, ConnectWatcher, conn, df, false). poll_connection_and_check end df end
Creates new instance of PG::EM::Client
and attempts to establish connection.
Performs command asynchronously yielding from current fiber if EventMachine reactor is running and current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.
Otherwise performs a thread-blocking call to the parent method.
@raise [PG::Error]
Special {Client} options (e.g.: {#async_autoreconnect}) must be provided in a connection_hash
argument variant. They will be ignored if passed in a connection_string
.
client_encoding
will be set according to Encoding.default_internal
.
@see deveiate.org/code/pg/PG/Connection.html#method-c-new PG::Connection.new
# File lib/pg/em.rb, line 456 def self.new(*args, &blk) if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) connect_defer(*args) {|r| f.resume(r) } conn = Fiber.yield raise conn if conn.is_a?(::Exception) if block_given? begin yield conn ensure conn.finish end else conn end else conn = super(*args) if on_connect = conn.on_connect on_connect.call(conn, false, false) end conn end end
@!visibility private
# File lib/pg/em.rb, line 481 def initialize(*args) Client.parse_async_options(args).each {|k, v| instance_variable_set(k, v) } super(*args) end
@!visibility private
# File lib/pg/em.rb, line 294 def self.parse_async_options(args) options = DEFAULT_ASYNC_VARS.dup if args.last.is_a? Hash args[-1] = args.last.reject do |key, value| case key.to_sym when :async_autoreconnect options[:@async_autoreconnect] = value true when :on_connect if value.respond_to? :call options[:@on_connect] = value else raise ArgumentError, "on_connect must respond to `call'" end true when :on_reconnect raise ArgumentError, "on_reconnect is no longer supported, use on_autoreconnect" when :on_autoreconnect if value.respond_to? :call options[:@on_autoreconnect] = value options[:@async_autoreconnect] = true if options[:@async_autoreconnect].nil? else raise ArgumentError, "on_autoreconnect must respond to `call'" end true when :connect_timeout options[:@connect_timeout] = value.to_f false when :query_timeout options[:@query_timeout] = value.to_f true end end end options[:@async_autoreconnect] = !!options[:@async_autoreconnect] options[:@connect_timeout] ||= ENV[@@connect_timeout_envvar].to_f options end
Returns true
if pg
supports single row mode or false
otherwise. Single row mode is available since libpq
9.2. @return [Boolean] @see deveiate.org/code/pg/PG/Connection.html#method-i-set_single_row_mode PG::Connection#set_single_row_mode
# File lib/pg/em.rb, line 270 def self.single_row_mode? method_defined? :set_single_row_mode end
Public Instance Methods
@!visibility private Perform auto re-connect. Used internally.
# File lib/pg/em.rb, line 526 def async_autoreconnect!(deferrable, error, send_proc = nil, &on_connection_bad) # reconnect only if connection is bad and flag is set if self.status == CONNECTION_BAD yield if block_given? if async_autoreconnect # check if transaction was active was_in_transaction = case @last_transaction_status when PQTRANS_IDLE, PQTRANS_UNKNOWN false else true end # reset asynchronously reset_df = reset_defer # just fail on reset failure reset_df.errback { |ex| deferrable.fail ex } # reset succeeds reset_df.callback do # handle on_autoreconnect if on_autoreconnect # wrap in a fiber, so on_autoreconnect code may yield from it Fiber.new do # call on_autoreconnect handler and fail if it raises an error returned_df = begin on_autoreconnect.call(self, error) rescue => ex ex end if returned_df.respond_to?(:callback) && returned_df.respond_to?(:errback) # the handler returned a deferrable returned_df.callback do if was_in_transaction || !send_proc # fail anyway, there was a transaction in progress or in single result mode deferrable.fail error else # try to call failed query command again deferrable.protect(&send_proc) end end # fail when handler's deferrable fails returned_df.errback { |ex| deferrable.fail ex } elsif returned_df.is_a?(Exception) # tha handler returned an exception object, so fail with it deferrable.fail returned_df elsif returned_df == false || !send_proc || (was_in_transaction && returned_df != true) # tha handler returned false or in single result mode # or there was an active transaction and handler didn't return true deferrable.fail error else # try to call failed query command again deferrable.protect(&send_proc) end end.resume elsif was_in_transaction || !send_proc # there was a transaction in progress or in single result mode; # fail anyway deferrable.fail error else # no on_autoreconnect handler, no transaction # try to call failed query command again deferrable.protect(&send_proc) end end # connection is bad, reset in progress, all done return end end # connection is either good or bad, the async_autoreconnect is not set deferrable.fail error end
Closes the backend connection.
Detaches watch handler to prevent memory leak after calling parent PG::Connection#finish. @see deveiate.org/code/pg/PG/Connection.html#method-i-finish PG::Connection#finish
# File lib/pg/em.rb, line 500 def finish super if @watcher @watcher.detach if @watcher.watching? @watcher = nil end end
Asynchronously retrieves all available results on the current connection (from previously issued asynchronous commands like +send_query()+) and immediately returns with a Deferrable. It then receives the last non-NULL result on :succeed, or nil
if no results are available.
@macro deferrable_api @yieldparam result [PG::Result|Error|nil] command result on success or a PG::Error instance on error
or +nil+ if no results are available.
@see deveiate.org/code/pg/PG/Connection.html#method-i-send_query PG::Connection#send_query @see deveiate.org/code/pg/PG/Connection.html#method-i-get_last_result PG::Connection#get_last_result
# File lib/pg/em.rb, line 775 def get_last_result_defer(&blk) df = FeaturedDeferrable.new begin if status == CONNECTION_OK setup_emio_watcher.watch_results(df) else df.succeed end rescue Error => e ::EM.next_tick { async_autoreconnect!(df, e) } rescue Exception => e ::EM.next_tick { df.fail(e) } end df.completion(&blk) if block_given? df end
Retrieves the next result from a call to send_query (or another asynchronous command). If no more results are available returns nil
and the block (if given) is never called.
@macro auto_synchrony_api @return [nil] if no more results
@see get_result_defer
@see deveiate.org/code/pg/PG/Connection.html#method-i-get_result PG::Connection#get_result
# File lib/pg/em.rb, line 990 def get_result if is_busy && ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) if (result = fiber_sync get_result_defer, f).is_a?(::Exception) raise result end if block_given? && result begin yield result ensure result.clear end else result end else super end end
Asynchronously retrieves the next result from a call to send_query (or another asynchronous command) and immediately returns with a Deferrable. It then receives the result object on :succeed, or nil
if no results are available.
@macro deferrable_api @yieldparam result [PG::Result|Error|nil] command result on success or a PG::Error instance on error
or +nil+ if no results are available.
@see deveiate.org/code/pg/PG/Connection.html#method-i-send_query PG::Connection#send_query @see deveiate.org/code/pg/PG/Connection.html#method-i-get_result PG::Connection#get_result
# File lib/pg/em.rb, line 741 def get_result_defer(&blk) df = FeaturedDeferrable.new begin if status == CONNECTION_OK if is_busy setup_emio_watcher.watch_results(df, nil, true) else df.succeed blocking_get_result end else df.succeed end rescue Error => e ::EM.next_tick { async_autoreconnect!(df, e) } rescue Exception => e ::EM.next_tick { df.fail(e) } end df.completion(&blk) if block_given? df end
# File lib/pg/em.rb, line 198 def on_autoreconnect(&hook) if block_given? @on_autoreconnect = hook else @on_autoreconnect end end
# File lib/pg/em.rb, line 254 def on_connect(&hook) if block_given? @on_connect = hook else @on_connect end end
# File lib/pg/em.rb, line 797 def raise_error(klass=Error, message=error_message) error = klass.new(message) error.instance_variable_set(:@connection, self) raise error end
Attempts to reset the connection.
Performs command asynchronously yielding from current fiber if EventMachine reactor is running and current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.
Otherwise performs a thread-blocking call to the parent method.
@raise [PG::Error] @see reset_defer
@see deveiate.org/code/pg/PG/Connection.html#method-i-reset PG::Connection#reset
# File lib/pg/em.rb, line 418 def reset if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) reset_defer {|r| f.resume(r) } conn = Fiber.yield raise conn if conn.is_a?(::Exception) conn else @async_command_aborted = false if @watcher @watcher.detach if @watcher.watching? @watcher = nil end super @on_connect.call(self, false, true) if @on_connect self end end
Attempts to reset the connection asynchronously.
@return [FeaturedDeferrable] @yieldparam pg [Client|PG::Error] reconnected client instance on
success or an instance of raised PG::Error
Pass the block to the returned deferrable’s callback
to execute after successfull reset. If the block is provided it’s bound to callback
and errback
hooks of the returned deferrable. @see deveiate.org/code/pg/PG/Connection.html#method-i-reset PG::Connection#reset
# File lib/pg/em.rb, line 382 def reset_defer(&blk) @async_command_aborted = false df = FeaturedDeferrable.new(&blk) # there can be only one watch handler over the socket # apparently eventmachine has hard time dealing with more than one if @watcher @watcher.detach if @watcher.watching? @watcher = nil end ret = df.protect(:fail) { reset_start } unless ret == :fail ::EM.watch(self.socket_io, ConnectWatcher, self, df, true). poll_connection_and_check end df end
Returns true
if pg
supports single row mode or false
otherwise. @return [Boolean] @see single_row_mode?
# File lib/pg/em.rb, line 277 def single_row_mode? self.class.single_row_mode? end
Returns status of connection: PG::CONNECTION_OK or PG::CONNECTION_BAD.
@return [Number] Returns PG::CONNECTION_BAD
for connections with async_command_aborted
flag set by expired query timeout. Otherwise return whatever PG::Connection#status returns. @see deveiate.org/code/pg/PG/Connection.html#method-i-status PG::Connection#status
# File lib/pg/em.rb, line 516 def status if @async_command_aborted CONNECTION_BAD else super end end
Executes a BEGIN at the start of the block and a COMMIT at the end of the block or ROLLBACK if any exception occurs.
@note Avoid using PG::EM::Client#*_defer calls inside the block or make sure
all queries are completed before the provided block terminates.
@return [Object] result of the block @yieldparam client [self] @see deveiate.org/code/pg/PG/Connection.html#method-i-transaction PG::Connection#transaction
Calls to {#transaction} may be nested, however without sub-transactions (save points). If the innermost transaction block raises an error the transaction is rolled back to the state before the outermost transaction began.
This is an extension to the +PG::Connection#transaction+ method as it does not support nesting in this way.
The method is sensitive to the transaction status and will safely rollback on any sql error even when it was catched by some rescue block. But consider that rescuing any sql error within an utility method is a bad idea.
This method works in both blocking/async modes (regardles of the reactor state) and is considered as a generic extension to the +PG::Connection#transaction+ method.
@example Nested transaction example
def add_comment(user_id, text) db.transaction do cmt_id = db.query( 'insert into comments (text) where user_id=$1 values ($2) returning id', [user_id, text]).getvalue(0,0) db.query( 'update users set last_comment_id=$2 where id=$1', [user_id, cmt_id]) cmt_id end end def update_comment_count(page_id) db.transaction do count = db.query('select count(*) from comments where page_id=$1', [page_id]).getvalue(0,0) db.query('update pages set comment_count=$2 where id=$1', [page_id, count]) end end # to run add_comment and update_comment_count within the same transaction db.transaction do add_comment(user_id, some_text) update_comment_count(page_id) end
# File lib/pg/em.rb, line 1067 def transaction raise ArgumentError, 'Must supply block for PG::EM::Client#transaction' unless block_given? tcount = @client_tran_count.to_i case transaction_status when PQTRANS_IDLE # there is no transaction yet, so let's begin exec(TRAN_BEGIN_QUERY) # reset transaction count in case user code rolled it back before tcount = 0 if tcount != 0 when PQTRANS_INTRANS # transaction in progress, leave it be else # transaction failed, is in unknown state or command is active # in any case calling begin will raise server transaction error exec(TRAN_BEGIN_QUERY) # raises PG::InFailedSqlTransaction end # memoize nested count @client_tran_count = tcount + 1 begin result = yield self rescue # error was raised case transaction_status when PQTRANS_INTRANS, PQTRANS_INERROR # do not rollback if transaction was rolled back before # or is in unknown state, which means connection reset is needed # and rollback only from the outermost transaction block exec(TRAN_ROLLBACK_QUERY) if tcount.zero? end # raise again raise else # we are good (but not out of woods yet) case transaction_status when PQTRANS_INTRANS # commit only from the outermost transaction block exec(TRAN_COMMIT_QUERY) if tcount.zero? when PQTRANS_INERROR # no ruby error was raised (or an error was rescued in code block) # but there was an sql error anyway # so rollback after the outermost block exec(TRAN_ROLLBACK_QUERY) if tcount.zero? when PQTRANS_IDLE # the code block has terminated the transaction on its own # so just reset the counter tcount = 0 else # something isn't right, so provoke an error just in case exec(TRAN_ROLLBACK_QUERY) if tcount.zero? end result ensure @client_tran_count = tcount end end
Blocks while waiting for notification(s), or until the optional timeout
is reached, whichever comes first. Returns nil
if timeout
is reached, the name of the NOTIFY
event otherwise.
If EventMachine reactor is running and the current fiber isn’t the root fiber this method performs command asynchronously yielding current fiber. Other fibers can process while the current one is waiting for notifications.
Otherwise performs a blocking call to a parent method. @return [String|nil] @yieldparam name [String] the name of the NOTIFY
event @yieldparam pid [Number] the generating pid @yieldparam payload [String] the optional payload @raise [PG::Error]
@see deveiate.org/code/pg/PG/Connection.html#method-i-wait_for_notify PG::Connection#wait_for_notify
# File lib/pg/em.rb, line 961 def wait_for_notify(timeout = nil) if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) unless notify_hash = notifies if (notify_hash = fiber_sync wait_for_notify_defer(timeout), f).is_a?(::Exception) raise notify_hash end end if notify_hash if block_given? yield notify_hash.values_at(:relname, :be_pid, :extra) end notify_hash[:relname] end else super end end
Asynchronously waits for notification or until the optional timeout
is reached, whichever comes first. timeout
is measured in seconds and can be fractional. Returns immediately with a Deferrable.
Pass the block to the returned deferrable’s callback
to obtain notification hash. In case of connection error errback
hook is called with an error object. If the timeout
is reached nil
is passed to deferrable’s callback
. If the block is provided it’s bound to both the callback
and errback
hooks of the returned deferrable. If another call is made to this method before the notification is received (or before reaching timeout) the previous deferrable’s errback
will be called with nil
argument.
@return [FeaturedDeferrable] @yieldparam notification [Hash|nil|Error] notification hash or a PG::Error instance on error
or nil when timeout is reached or canceled.
@see deveiate.org/code/pg/PG/Connection.html#method-i-notifies PG::Connection#notifies
# File lib/pg/em.rb, line 708 def wait_for_notify_defer(timeout = nil, &blk) df = FeaturedDeferrable.new begin check_async_command_aborted! if status == CONNECTION_OK setup_emio_watcher.watch_notify(df, timeout) else raise_error ConnectionBad end rescue Error => e ::EM.next_tick { async_autoreconnect!(df, e) } rescue Exception => e ::EM.next_tick { df.fail(e) } end df.completion(&blk) if block_given? df end
Private Instance Methods
# File lib/pg/em.rb, line 814 def check_async_command_aborted! if @async_command_aborted raise_error ConnectionBad, "previous query expired, need connection reset" end end
# File lib/pg/em.rb, line 805 def fiber_sync(df, fiber) f = nil df.completion do |res| if f then f.resume res else return res end end f = fiber Fiber.yield end
# File lib/pg/em.rb, line 820 def setup_emio_watcher if @watcher && @watcher.watching? @watcher else @watcher = ::EM.watch(self.socket_io, Watcher, self) end end