class QC::ConnAdapter
Attributes
connection[RW]
Public Class Methods
new(c=nil)
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 8 def initialize(c=nil) @connection = c.nil? ? establish_new : validate!(c) @mutex = Mutex.new end
Public Instance Methods
disconnect()
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 41 def disconnect @mutex.synchronize do begin @connection.close rescue => e QC.log(:at => 'disconnect', :error => e.message) end end end
execute(stmt, *params)
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 13 def execute(stmt, *params) @mutex.synchronize do QC.log(:at => "exec_sql", :sql => stmt.inspect) begin params = nil if params.empty? r = @connection.exec(stmt, params) result = [] r.each {|t| result << t} result.length > 1 ? result : result.pop rescue PG::Error => e QC.log(:error => e.inspect) @connection.reset raise end end end
server_version()
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 51 def server_version @server_version ||= begin version = execute("SHOW server_version_num;")["server_version_num"] version && version.to_i end end
wait(time, *channels)
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 30 def wait(time, *channels) @mutex.synchronize do listen_cmds = channels.map {|c| 'LISTEN "' + c.to_s + '"'} @connection.exec(listen_cmds.join(';')) wait_for_notify(time) unlisten_cmds = channels.map {|c| 'UNLISTEN "' + c.to_s + '"'} @connection.exec(unlisten_cmds.join(';')) drain_notify end end
Private Instance Methods
db_url()
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 102 def db_url return @db_url if defined?(@db_url) && @db_url url = ENV["QC_DATABASE_URL"] || ENV["DATABASE_URL"] || raise(ArgumentError, "missing QC_DATABASE_URL or DATABASE_URL") @db_url = URI.parse(url) end
drain_notify()
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 66 def drain_notify until @connection.notifies.nil? QC.log(:at => "drain_notifications") end end
establish_new()
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 78 def establish_new QC.log(:at => "establish_conn") conn = PG.connect(*normalize_db_url(db_url)) if conn.status != PG::CONNECTION_OK QC.log(:error => conn.error) end conn.exec("SET application_name = '#{QC.app_name}'") conn end
normalize_db_url(url)
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 88 def normalize_db_url(url) host = url.host host = host.gsub(/%2F/i, '/') if host [ host, # host or percent-encoded socket path url.port || 5432, nil, '', #opts, tty url.path.gsub("/",""), # database name url.user, url.password ] end
validate!(c)
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 72 def validate!(c) return c if c.is_a?(PG::Connection) err = "connection must be an instance of PG::Connection, but was #{c.class}" raise(ArgumentError, err) end
wait_for_notify(t)
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 60 def wait_for_notify(t) Array.new.tap do |msgs| @connection.wait_for_notify(t) {|event, pid, msg| msgs << msg} end end