class PGx::Connection
@author Kiriakos Georgiou, www.mockbites.com/about Extends the base class PG::Connection methods by adding logging and retrying capabilities (synchronous methods only.) The arguments and method behaviors are identical, except for the 'new' method. @see www.mockbites.com/articles/tech/pgx pgx documentation @see deveiate.org/code/pg/PG/Connection.html Base class methods
Public Class Methods
@param [Logger] logger: a logger object, or a lambda returning the
logger object. Note that a lambda returning the logger is preferable because it makes logging of the log object itself easier on the eyes. This parameter can be a single logger object, or an array of logger objects in case you want to log to multiple logs (eg: stdout and a log file.)
@param [Proc] connect_init: a Proc that contains code that will
always be executed every time a new connection is established. It is also executed after a reset(). May be useful for things such as setting the search_path.
@param [Proc, Integer] connect_retry_sleep: a Proc or Integer.
If a Proc is passed, the Proc will be called with one argument, try_count, which starts at 1 and increments by one with every unccesseful connection attempt. The Proc should implement the algorithm for sleeping between connection attempts. If an Integer is passed, it indicates how long to sleep between connection attempts, in seconds. If nil is passed, no attempts to reconnect will be made, it simply raises an exception.
@param [Integer] method_try_count_max: the maximum number of times a
single method, or transaction block, can be tried
@param [Array<String>] method_retriable_error_states: the set of
error states for which a single method, or transaction block, should be retried
@param … The rest of the arguments are the same as the base
class method arguments
@see deveiate.org/code/pg/PG/Connection.html#method-c-new
Base class method.
@example
log = Logger.new('/tmp/debug.log'); log.level = Logger::DEBUG connect_sleeper = lambda { |try_count| sleep [try_count ** 2, 120].min } con = PGx::Connection.new( logger: log, connect_retry_sleep: connect_sleeper, dbname: 'test' )
# File lib/pgx.rb, line 88 def initialize(*args) connect_args = PGx.const_get('CONNECT_ARGS') (our_args, parent_args) = extract_our_args(connect_args, args) log = arg_with_default(connect_args, our_args, :logger) @logger = [ log.is_a?(Proc) ? log.call : log ].flatten @connect_init = arg_with_default(connect_args, our_args, :connect_init) @connect_retry_sleep = arg_with_default(connect_args, our_args, :connect_retry_sleep) @method_try_count_max = arg_with_default(connect_args, our_args, :method_try_count_max) @method_retriable_error_states = arg_with_default(connect_args, our_args, :method_retriable_error_states) # gets set in connect_log_and_try() to the connection handle # needed by reset() to be able to execute the connect_init proc @connection = nil # always include invalid_sql_statement_name (required for prepared statements to work after re-connects) @method_retriable_error_states.push('26000') unless @method_retriable_error_states.include?('26000') @in_transaction_block = false @prepared_statements = {} # hash of all statements ever prepared, key = statement name, value = sql @prepared_live = {} # keeps track which prepared statements are currently prepared @transaction_reprepare = nil # placeholder for statement to be prepared before transaction retry @connect_proc = lambda { @prepared_live = {} # reset on connect connect_log_and_try( lambda { super(*parent_args) }, our_args, parent_args ) } connect_loop() end
Public Instance Methods
# File lib/pgx.rb, line 192 def cancel(*args) sql_log_and_try(lambda { super }, __method__, args) end
# File lib/pgx.rb, line 196 def close(*args) sql_log_and_try(lambda { super }, __method__, args) end
@example
con.exec "insert into mytable(x) values('test value')" con.exec 'insert into mytable(x) values($1)', ['test value']
# File lib/pgx.rb, line 138 def exec(*args) sql_log_and_try(lambda { super }, __method__, args) end
@example
con.exec_params 'select * from mytable where x = $1 limit $2', [1, 100] do |rs| puts rs.fields.collect { |fname| "%-15s" % [fname] }.join('') rs.values.collect { |row| puts row.collect { |col| "%-15s" % [col] }.join('') } end
# File lib/pgx.rb, line 154 def exec_params(*args) sql_log_and_try(lambda { super }, __method__, args) end
@example
con.prepare 'sql1', 'select * from mytable limit $1' con.exec_prepared 'sql1', [100] do |rs| puts rs.fields.collect { |fname| "%-15s" % [fname] }.join('') rs.values.collect { |row| puts row.collect { |col| "%-15s" % [col] }.join('') } end
# File lib/pgx.rb, line 188 def exec_prepared(*args) sql_log_and_try(lambda { super }, __method__, args) end
# File lib/pgx.rb, line 200 def finish(*args) sql_log_and_try(lambda { super }, __method__, args) end
@example
con.prepare 'sql1', 'select * from mytable limit $1' con.exec_prepared 'sql1', [100] do |rs| puts rs.fields.collect { |fname| "%-15s" % [fname] }.join('') rs.values.collect { |row| puts row.collect { |col| "%-15s" % [col] }.join('') } end
# File lib/pgx.rb, line 166 def prepare(*args) # it's possible to call prepare() on an statement that is already prepared # if you have a prepare() inside a transaction, and the transaction is retried # thus we check if it's already live s = args[0] if @prepared_live.has_key?(s) log_debug %Q{prepared statement #{s} is already live, skipping re-preparing it} else sql_log_and_try(lambda { super }, __method__, args) @prepared_statements[s] = args[1] @prepared_live[s] = true end end
Alias for exec
# File lib/pgx.rb, line 143 def query(*args) sql_log_and_try(lambda { super }, __method__, args) end
# File lib/pgx.rb, line 204 def reset(*args) sql_log_and_try(lambda { super }, __method__, args) @prepared_live = {} # reset on connect @connect_init.call(@connection) end
@example
con.transaction do con.exec "create temp table mytable(x text)" con.exec "insert into mytable(x) values('a')" con.exec "insert into mytable(x) values('b')" end
# File lib/pgx.rb, line 129 def transaction(*args) @in_transaction_block = true sql_log_and_try(lambda { super }, __method__, args) @in_transaction_block = false end
# File lib/pgx.rb, line 210 def wait_for_notify(*args) sql_log_and_try(lambda { super }, __method__, args) end
Private Instance Methods
returns the value of an argument (symbol) from our_args, or if it's not defined, it returns the default value
# File lib/pgx.rb, line 218 def arg_with_default(all_our_possible_args, our_args, symbol) return our_args.has_key?(symbol) ? our_args[symbol] : all_our_possible_args[symbol] end
log and retry the connect/new method
# File lib/pgx.rb, line 315 def connect_log_and_try(caller_super, our_args, parent_args) try_count = 0 all_args = merge_connect_args( sanitize_our_connect_args(our_args), sanitize_parent_connect_args(parent_args) ) begin log_debug function_call_string('connect', all_args) @connection = caller_super.call # run the parent 'pg' function, new() log_debug 'calling connection initialization proc' @connect_init.call(@connection) rescue PG::Error => e try_count = try_count + 1 log_error %Q{#{e.class}, #{e.message.chomp}} if @connect_retry_sleep.nil? or e.message =~ /^FATAL/ raise # just raise the error, do not retry to connect elsif @connect_retry_sleep.is_a?(Proc) @connect_retry_sleep.call(try_count) elsif @connect_retry_sleep.is_a?(Integer) sleep(@connect_retry_sleep) end retry end end
Will not return until a database connection has been established.
# File lib/pgx.rb, line 248 def connect_loop @connect_proc.call end
Returns true or false depending on whether the connection is OK or not. This has to be executed right after one of the above public methods, since it doesn't really ping the server on its own.
# File lib/pgx.rb, line 239 def connected? begin return(status() == PG::CONNECTION_OK ? true : false) rescue PG::Error => e return false end end
special handling of certain error states (not in a transaction)
# File lib/pgx.rb, line 366 def error_handlers(pgerr, caller_method, args) begin state = pgerr.result.error_field(PG::Result::PG_DIAG_SQLSTATE) rescue PG::Error => e return # just return if we can't get the state; lost connection? end case state when '26000' reprepare_statement(args[0]) if caller_method.to_s == 'exec_prepared' end # case end
extract our arguments defined in all_our_possible_args from args returns an array containing our arguments (hash) and the aguments for the parent method
# File lib/pgx.rb, line 225 def extract_our_args(all_our_possible_args, args) our_args = {} if args[0].is_a?(Hash) all_our_possible_args.each_key do |x| our_args[x] = args[0].delete(x) if args[0].has_key?(x) end args.shift if args[0].empty? end return [our_args, args] end
Given a function name string and an arguments array, it returns a string representing the function call. This is used in the logging functions.
# File lib/pgx.rb, line 255 def function_call_string(func, args) sprintf "%s.%s(%s)", self.class, func.to_s, ( args.is_a?(Array) ? args : [args] ).map { |x| if x.is_a?(Hash) and x.empty? nil else # Inspect does not expand \n, \r and \t, so we expand them # ourselves via gsubs in order to get more readable logging output. x.inspect.gsub('\n', "\n").gsub('\r', "\r").gsub('\t', "\t") end }.join(', ') end
error reporting helper
# File lib/pgx.rb, line 423 def get_error_fields(pgerr) PGx.const_get('ERROR_FIELDCODES').map { |key, value| begin errval = pgerr.result.error_field(value) rescue PG::Error => e errval = '?' end %Q{#{key}=#{errval}} }.join(', ') end
error reporting helper
# File lib/pgx.rb, line 435 def get_error_message(pgerr) begin errval = pgerr.result.error_message rescue PG::Error => e errval = '?' end errval.chomp end
logging helper
# File lib/pgx.rb, line 460 def log_debug(str) @logger.each { |log| log.debug { str } } end
logging helper
# File lib/pgx.rb, line 445 def log_error(str) @logger.each { |log| log.error { str } } end
logging helper
# File lib/pgx.rb, line 455 def log_info(str) @logger.each { |log| log.info { str } } end
logging helper
# File lib/pgx.rb, line 450 def log_warn(str) @logger.each { |log| log.warn { str } } end
merge as follows:
hash and hash -> hash hash and array -> array
# File lib/pgx.rb, line 302 def merge_connect_args(our_args, parent_args) if our_args.empty? return parent_args elsif parent_args.empty? return our_args elsif parent_args.length == 1 and parent_args[0].is_a?(Hash) return our_args.merge(parent_args[0]) # connection hash style else return [our_args].concat(parent_args) end end
return true if any method call, except transaction(), is called from within a transaction() block
# File lib/pgx.rb, line 395 def method_in_transaction_block(method) return(method.to_s != 'transaction' and @in_transaction_block) end
# File lib/pgx.rb, line 386 def reprepare_statement(name) if @prepared_statements.has_key?(name) log_debug "Re-preparing statement '#{name}'" prepare(name, @prepared_statements[name]) end end
sanitize the values for certain pgx-specific arguments so they don't clog the logs returns the sanitized arg hash
# File lib/pgx.rb, line 272 def sanitize_our_connect_args(arg_hash) arg_hash.merge(arg_hash) do |k, ov| case k when :logger ov.is_a?(Proc) ? ov : '...' else ov end end end
sanitize parent arguments, eg: blank the password so it's not in the logs
# File lib/pgx.rb, line 284 def sanitize_parent_connect_args(parent_args) if parent_args.empty? return parent_args elsif parent_args.length == 1 if parent_args[0].is_a?(Hash) # hash method return [ parent_args[0].merge(parent_args[0]) { |k, ov| k == :password ? '...' : ov } ] end if parent_args[0].is_a?(String) # string method return [ parent_args[0].gsub(/(password\s*=\s*)\S+/, '\1...') ] end else # positional arguments method return [ parent_args.map.with_index { |x, i| i == 6 ? '...' : x } ] end end
log and retry statement related methods
# File lib/pgx.rb, line 339 def sql_log_and_try(caller_super, caller_method, args) try_count = 0 begin # re-prepare prepared statements after a failed transaction transaction_reprepare(caller_method) # just prior to retrying the transaction log_debug function_call_string(caller_method, args) caller_super.call # run the parent 'pg' function rescue PG::Error => e if connected? # if we are connected, it's an SQL related error if not method_in_transaction_block(caller_method) # do not log errors and retry try_count = try_count + 1 # methods within a transaction log_error(get_error_fields(e) + "\n" + get_error_message(e)) # block because the transaction error_handlers(e, caller_method, args) # itself will log the error and retry if sql_retriable?(e, try_count) # retry the transaction block elsif caller_method.to_s == 'exec_prepared' # exec_prepared failed within a transaction @transaction_reprepare = args[0] # make a note to re-prepare before retrying end # the transaction else # not connected log_warn 'bad database connection' PG_close() # close so we don't leak connections connect_loop() retry end raise # raise to caller if we can't retry end end
determine if a single statement or transaction block that resulted in an error can be retried
# File lib/pgx.rb, line 401 def sql_retriable?(pgerr, try_count) begin state = pgerr.result.error_field(PG::Result::PG_DIAG_SQLSTATE) rescue PG::Error => e return true # retry if we can't get the state; lost connection? end if @method_retriable_error_states.include?(state) if try_count < @method_try_count_max log_debug(%Q{State #{state} is retriable, retrying...}) return true else log_debug(%Q{State #{state} is retriable, but the upper limit (#{ @method_try_count_max}) of tries has been reached}) return false end else return false end end
# File lib/pgx.rb, line 379 def transaction_reprepare(caller_method) if caller_method.to_s == 'transaction' and not @transaction_reprepare.nil? reprepare_statement(@transaction_reprepare) @transaction_reprepare = nil end end