class PGx::Connection

@author Kiriakos Georgiou, www.mockbites.com/about Extends the base class PG::Connection methods by adding logging and retrying capabilities (synchronous methods only.) The arguments and method behaviors are identical, except for the 'new' method. @see www.mockbites.com/articles/tech/pgx pgx documentation @see deveiate.org/code/pg/PG/Connection.html Base class methods

Public Class Methods

new(*args) click to toggle source

@param [Logger] logger: a logger object, or a lambda returning the

logger object.  Note that a lambda returning the logger is
preferable because it makes logging of the log object itself
easier on the eyes.  This parameter can be a single logger
object, or an array of logger objects in case you want to log to
multiple logs (eg: stdout and a log file.)

@param [Proc] connect_init: a Proc that contains code that will

always be executed every time a new connection is established.
It is also executed after a reset().
May be useful for things such as setting the search_path.

@param [Proc, Integer] connect_retry_sleep: a Proc or Integer.

If a Proc is passed, the Proc will be called with one argument,
try_count, which starts at 1 and increments by one with every
unccesseful connection attempt.  The Proc should implement the
algorithm for sleeping between connection attempts.
If an Integer is passed, it indicates how long to sleep between
connection attempts, in seconds.
If nil is passed, no attempts to reconnect will be made, it
simply raises an exception.

@param [Integer] method_try_count_max: the maximum number of times a

single method, or transaction block, can be tried

@param [Array<String>] method_retriable_error_states: the set of

error states for which a single method, or transaction block,
should be retried

@param … The rest of the arguments are the same as the base

class method arguments

@see deveiate.org/code/pg/PG/Connection.html#method-c-new

Base class method.

@example

log = Logger.new('/tmp/debug.log');
log.level = Logger::DEBUG
connect_sleeper = lambda { |try_count|
    sleep [try_count ** 2, 120].min
}
con = PGx::Connection.new(
    logger: log,
    connect_retry_sleep: connect_sleeper,
    dbname: 'test'
)
Calls superclass method
# File lib/pgx.rb, line 88
def initialize(*args)
    connect_args = PGx.const_get('CONNECT_ARGS')
    (our_args, parent_args) = extract_our_args(connect_args, args)

    log = arg_with_default(connect_args, our_args, :logger)
    @logger = [ log.is_a?(Proc) ? log.call : log ].flatten
    @connect_init = arg_with_default(connect_args, our_args, :connect_init)
    @connect_retry_sleep = arg_with_default(connect_args, our_args, :connect_retry_sleep)
    @method_try_count_max = arg_with_default(connect_args, our_args, :method_try_count_max)
    @method_retriable_error_states = arg_with_default(connect_args, our_args, :method_retriable_error_states)

    # gets set in connect_log_and_try() to the connection handle
    # needed by reset() to be able to execute the connect_init proc
    @connection = nil

    # always include invalid_sql_statement_name (required for prepared statements to work after re-connects)
    @method_retriable_error_states.push('26000') unless @method_retriable_error_states.include?('26000')

    @in_transaction_block = false

    @prepared_statements = {}  # hash of all statements ever prepared, key = statement name, value = sql
    @prepared_live = {} # keeps track which prepared statements are currently prepared
    @transaction_reprepare = nil # placeholder for statement to be prepared before transaction retry

    @connect_proc = lambda {
        @prepared_live = {} # reset on connect
        connect_log_and_try(
            lambda { super(*parent_args) },
            our_args,
            parent_args
        )
    }
    connect_loop()
end

Public Instance Methods

PG_close(*args)
Alias for: close
cancel(*args) click to toggle source
Calls superclass method
# File lib/pgx.rb, line 192
def cancel(*args)
    sql_log_and_try(lambda { super }, __method__, args)
end
close(*args) click to toggle source
Calls superclass method
# File lib/pgx.rb, line 196
def close(*args)
    sql_log_and_try(lambda { super }, __method__, args)
end
Also aliased as: PG_close
exec(*args) click to toggle source

@example

con.exec "insert into mytable(x) values('test value')"
con.exec 'insert into mytable(x) values($1)', ['test value']
Calls superclass method
# File lib/pgx.rb, line 138
def exec(*args)
    sql_log_and_try(lambda { super }, __method__, args)
end
exec_params(*args) click to toggle source

@example

con.exec_params 'select * from mytable where x = $1 limit $2', [1, 100] do |rs|
    puts rs.fields.collect { |fname| "%-15s" % [fname] }.join('')
    rs.values.collect { |row|
        puts row.collect { |col| "%-15s" % [col] }.join('')
    }
end
Calls superclass method
# File lib/pgx.rb, line 154
def exec_params(*args)
    sql_log_and_try(lambda { super }, __method__, args)
end
exec_prepared(*args) click to toggle source

@example

con.prepare 'sql1', 'select * from mytable limit $1'
con.exec_prepared 'sql1', [100] do |rs|
    puts rs.fields.collect { |fname| "%-15s" % [fname] }.join('')
    rs.values.collect { |row|
        puts row.collect { |col| "%-15s" % [col] }.join('')
    }
end
Calls superclass method
# File lib/pgx.rb, line 188
def exec_prepared(*args)
    sql_log_and_try(lambda { super }, __method__, args)
end
finish(*args) click to toggle source
Calls superclass method
# File lib/pgx.rb, line 200
def finish(*args)
    sql_log_and_try(lambda { super }, __method__, args)
end
prepare(*args) click to toggle source

@example

con.prepare 'sql1', 'select * from mytable limit $1'
con.exec_prepared 'sql1', [100] do |rs|
    puts rs.fields.collect { |fname| "%-15s" % [fname] }.join('')
    rs.values.collect { |row|
        puts row.collect { |col| "%-15s" % [col] }.join('')
    }
end
Calls superclass method
# File lib/pgx.rb, line 166
def prepare(*args)
    # it's possible to call prepare() on an statement that is already prepared
    # if you have a prepare() inside a transaction, and the transaction is retried
    # thus we check if it's already live
    s = args[0]
    if @prepared_live.has_key?(s)
        log_debug %Q{prepared statement #{s} is already live, skipping re-preparing it}
    else
        sql_log_and_try(lambda { super }, __method__, args)
        @prepared_statements[s] = args[1]
        @prepared_live[s] = true
    end
end
query(*args) click to toggle source

Alias for exec

Calls superclass method
# File lib/pgx.rb, line 143
def query(*args)
    sql_log_and_try(lambda { super }, __method__, args)
end
reset(*args) click to toggle source
Calls superclass method
# File lib/pgx.rb, line 204
def reset(*args)
    sql_log_and_try(lambda { super }, __method__, args)
    @prepared_live = {} # reset on connect
    @connect_init.call(@connection)
end
transaction(*args) click to toggle source

@example

con.transaction do
    con.exec "create temp table mytable(x text)"
    con.exec "insert into mytable(x) values('a')"
    con.exec "insert into mytable(x) values('b')"
end
Calls superclass method
# File lib/pgx.rb, line 129
def transaction(*args)
    @in_transaction_block = true
    sql_log_and_try(lambda { super }, __method__, args)
    @in_transaction_block = false
end
wait_for_notify(*args) click to toggle source
Calls superclass method
# File lib/pgx.rb, line 210
def wait_for_notify(*args)
    sql_log_and_try(lambda { super }, __method__, args)
end

Private Instance Methods

arg_with_default(all_our_possible_args, our_args, symbol) click to toggle source

returns the value of an argument (symbol) from our_args, or if it's not defined, it returns the default value

# File lib/pgx.rb, line 218
def arg_with_default(all_our_possible_args, our_args, symbol)
    return our_args.has_key?(symbol) ? our_args[symbol] : all_our_possible_args[symbol]
end
connect_log_and_try(caller_super, our_args, parent_args) click to toggle source

log and retry the connect/new method

# File lib/pgx.rb, line 315
def connect_log_and_try(caller_super, our_args, parent_args)
    try_count = 0
    all_args = merge_connect_args( sanitize_our_connect_args(our_args),
                                   sanitize_parent_connect_args(parent_args) )
    begin
        log_debug function_call_string('connect', all_args)
        @connection = caller_super.call # run the parent 'pg' function, new()
        log_debug 'calling connection initialization proc'
        @connect_init.call(@connection)
    rescue PG::Error => e
        try_count = try_count + 1
        log_error %Q{#{e.class}, #{e.message.chomp}}
        if @connect_retry_sleep.nil? or e.message =~ /^FATAL/
            raise # just raise the error, do not retry to connect
        elsif @connect_retry_sleep.is_a?(Proc)
            @connect_retry_sleep.call(try_count)
        elsif @connect_retry_sleep.is_a?(Integer)
            sleep(@connect_retry_sleep)
        end
        retry
    end
end
connect_loop() click to toggle source

Will not return until a database connection has been established.

# File lib/pgx.rb, line 248
def connect_loop
    @connect_proc.call
end
connected?() click to toggle source

Returns true or false depending on whether the connection is OK or not. This has to be executed right after one of the above public methods, since it doesn't really ping the server on its own.

# File lib/pgx.rb, line 239
def connected?
    begin
        return(status() == PG::CONNECTION_OK ? true : false)
    rescue PG::Error => e
        return false
    end
end
error_handlers(pgerr, caller_method, args) click to toggle source

special handling of certain error states (not in a transaction)

# File lib/pgx.rb, line 366
def error_handlers(pgerr, caller_method, args)
    begin
        state = pgerr.result.error_field(PG::Result::PG_DIAG_SQLSTATE)
    rescue PG::Error => e
        return # just return if we can't get the state; lost connection?
    end

    case state
        when '26000'
            reprepare_statement(args[0]) if caller_method.to_s == 'exec_prepared'
    end # case
end
extract_our_args(all_our_possible_args, args) click to toggle source

extract our arguments defined in all_our_possible_args from args returns an array containing our arguments (hash) and the aguments for the parent method

# File lib/pgx.rb, line 225
def extract_our_args(all_our_possible_args, args)
    our_args = {}
    if args[0].is_a?(Hash)
        all_our_possible_args.each_key do |x|
            our_args[x] = args[0].delete(x) if args[0].has_key?(x)
        end 
        args.shift if args[0].empty?
    end
    return [our_args, args]
end
function_call_string(func, args) click to toggle source

Given a function name string and an arguments array, it returns a string representing the function call. This is used in the logging functions.

# File lib/pgx.rb, line 255
def function_call_string(func, args)
    sprintf "%s.%s(%s)",
        self.class,
        func.to_s,
        ( args.is_a?(Array) ? args : [args] ).map { |x|
            if x.is_a?(Hash) and x.empty?
                nil
            else
                # Inspect does not expand \n, \r and \t, so we expand them
                # ourselves via gsubs in order to get more readable logging output.
                x.inspect.gsub('\n', "\n").gsub('\r', "\r").gsub('\t', "\t")
            end
        }.join(', ')
end
get_error_fields(pgerr) click to toggle source

error reporting helper

# File lib/pgx.rb, line 423
def get_error_fields(pgerr)
    PGx.const_get('ERROR_FIELDCODES').map { |key, value|
        begin
            errval = pgerr.result.error_field(value)
        rescue PG::Error => e
            errval = '?'
        end
        %Q{#{key}=#{errval}}
    }.join(', ')
end
get_error_message(pgerr) click to toggle source

error reporting helper

# File lib/pgx.rb, line 435
def get_error_message(pgerr)
    begin
        errval = pgerr.result.error_message
    rescue PG::Error => e
        errval = '?'
    end
    errval.chomp
end
log_debug(str) click to toggle source

logging helper

# File lib/pgx.rb, line 460
def log_debug(str)
    @logger.each { |log| log.debug { str } }
end
log_error(str) click to toggle source

logging helper

# File lib/pgx.rb, line 445
def log_error(str)
    @logger.each { |log| log.error { str } }
end
log_info(str) click to toggle source

logging helper

# File lib/pgx.rb, line 455
def log_info(str)
    @logger.each { |log| log.info { str } }
end
log_warn(str) click to toggle source

logging helper

# File lib/pgx.rb, line 450
def log_warn(str)
    @logger.each { |log| log.warn { str } }
end
merge_connect_args(our_args, parent_args) click to toggle source

merge as follows:

hash and hash -> hash
hash and array -> array
# File lib/pgx.rb, line 302
def merge_connect_args(our_args, parent_args)
    if our_args.empty?
        return parent_args
    elsif parent_args.empty?
        return our_args
    elsif parent_args.length == 1 and parent_args[0].is_a?(Hash)
        return our_args.merge(parent_args[0]) # connection hash style
    else
        return [our_args].concat(parent_args)
    end
end
method_in_transaction_block(method) click to toggle source

return true if any method call, except transaction(), is called from within a transaction() block

# File lib/pgx.rb, line 395
def method_in_transaction_block(method)
    return(method.to_s != 'transaction' and @in_transaction_block)
end
reprepare_statement(name) click to toggle source
# File lib/pgx.rb, line 386
def reprepare_statement(name)
    if @prepared_statements.has_key?(name)
        log_debug "Re-preparing statement '#{name}'"
        prepare(name, @prepared_statements[name])
    end
end
sanitize_our_connect_args(arg_hash) click to toggle source

sanitize the values for certain pgx-specific arguments so they don't clog the logs returns the sanitized arg hash

# File lib/pgx.rb, line 272
def sanitize_our_connect_args(arg_hash)
    arg_hash.merge(arg_hash) do |k, ov|
        case k
        when :logger
            ov.is_a?(Proc) ? ov : '...'
        else
            ov
        end
    end
end
sanitize_parent_connect_args(parent_args) click to toggle source

sanitize parent arguments, eg: blank the password so it's not in the logs

# File lib/pgx.rb, line 284
def sanitize_parent_connect_args(parent_args)
    if parent_args.empty?
        return parent_args
    elsif parent_args.length == 1
        if parent_args[0].is_a?(Hash) # hash method
            return [ parent_args[0].merge(parent_args[0]) { |k, ov| k == :password ? '...' : ov } ]
        end
        if parent_args[0].is_a?(String) # string method
            return [ parent_args[0].gsub(/(password\s*=\s*)\S+/, '\1...') ]
        end
    else # positional arguments method
        return [ parent_args.map.with_index { |x, i| i == 6 ? '...' : x } ]
    end
end
sql_log_and_try(caller_super, caller_method, args) click to toggle source

log and retry statement related methods

# File lib/pgx.rb, line 339
def sql_log_and_try(caller_super, caller_method, args)
    try_count = 0
    begin                                     # re-prepare prepared statements after a failed transaction
        transaction_reprepare(caller_method)  # just prior to retrying the transaction
        log_debug function_call_string(caller_method, args)
        caller_super.call # run the parent 'pg' function
    rescue PG::Error => e
        if connected? # if we are connected, it's an SQL related error
            if not method_in_transaction_block(caller_method)                # do not log errors and retry
                try_count = try_count + 1                                    # methods within a transaction
                log_error(get_error_fields(e) + "\n" + get_error_message(e)) # block because the transaction
                error_handlers(e, caller_method, args)                       # itself will log the error and
                retry if sql_retriable?(e, try_count)                        # retry the transaction block
            elsif caller_method.to_s == 'exec_prepared' # exec_prepared failed within a transaction
                @transaction_reprepare = args[0]        # make a note to re-prepare before retrying
            end                                         # the transaction
        else # not connected
            log_warn 'bad database connection'
            PG_close() # close so we don't leak connections
            connect_loop()
            retry
        end
        raise # raise to caller if we can't retry
    end
end
sql_retriable?(pgerr, try_count) click to toggle source

determine if a single statement or transaction block that resulted in an error can be retried

# File lib/pgx.rb, line 401
def sql_retriable?(pgerr, try_count)
    begin
        state = pgerr.result.error_field(PG::Result::PG_DIAG_SQLSTATE)
    rescue PG::Error => e
        return true # retry if we can't get the state; lost connection?
    end

    if @method_retriable_error_states.include?(state)
        if try_count < @method_try_count_max
            log_debug(%Q{State #{state} is retriable, retrying...})
            return true
        else
            log_debug(%Q{State #{state} is retriable, but the upper limit (#{
                @method_try_count_max}) of tries has been reached})
            return false
        end
    else
        return false
    end
end
transaction_reprepare(caller_method) click to toggle source
# File lib/pgx.rb, line 379
def transaction_reprepare(caller_method)
    if caller_method.to_s == 'transaction' and not @transaction_reprepare.nil?
        reprepare_statement(@transaction_reprepare)
        @transaction_reprepare = nil
    end
end