class QC::ConnAdapter
Public Class Methods
new(args={})
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 9 def initialize(args={}) @active_record_connection_share = args[:active_record_connection_share] @_connection = args[:connection] @mutex = Mutex.new end
Public Instance Methods
connection()
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 15 def connection if @active_record_connection_share && Object.const_defined?('ActiveRecord') ActiveRecord::Base.connection.raw_connection else @_connection ||= establish_new end end
disconnect()
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 51 def disconnect @mutex.synchronize do begin connection.close rescue => e QC.log(:at => 'disconnect', :error => e.message) end end end
execute(stmt, *params)
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 23 def execute(stmt, *params) @mutex.synchronize do QC.log(:at => "exec_sql", :sql => stmt.inspect) begin params = nil if params.empty? r = connection.exec(stmt, params) result = [] r.each {|t| result << t} result.length > 1 ? result : result.pop rescue PG::Error => e QC.log(:error => e.inspect) connection.reset raise end end end
server_version()
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 61 def server_version @server_version ||= begin version = execute("SHOW server_version_num;")["server_version_num"] version && version.to_i end end
wait(time, *channels)
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 40 def wait(time, *channels) @mutex.synchronize do listen_cmds = channels.map {|c| 'LISTEN "' + c.to_s + '"'} connection.exec(listen_cmds.join(';')) wait_for_notify(time) unlisten_cmds = channels.map {|c| 'UNLISTEN "' + c.to_s + '"'} connection.exec(unlisten_cmds.join(';')) drain_notify end end
Private Instance Methods
db_url()
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 117 def db_url return @db_url if defined?(@db_url) && @db_url url = ENV["QC_DATABASE_URL"] || ENV["DATABASE_URL"] || raise(ArgumentError, "missing QC_DATABASE_URL or DATABASE_URL") @db_url = URI.parse(url) end
drain_notify()
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 76 def drain_notify until connection.notifies.nil? QC.log(:at => "drain_notifications") end end
establish_new()
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 88 def establish_new QC.log(:at => "establish_conn") conn = PG.connect(*normalize_db_url(db_url)) if conn.status != PG::CONNECTION_OK QC.log(:error => conn.error) end if conn.server_version < 90600 raise "This version of Queue Classic does not support Postgres older than 9.6 (90600). This version is #{conn.server_version}. If you need that support, please use an older version." end conn.exec("SET application_name = '#{QC.app_name}'") conn end
normalize_db_url(url)
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 103 def normalize_db_url(url) host = url.host host = host.gsub(/%2F/i, '/') if host [ host, # host or percent-encoded socket path url.port || 5432, nil, nil, #opts, tty url.path.gsub("/",""), # database name url.user, url.password ] end
validate!(c)
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 82 def validate!(c) return c if c.is_a?(PG::Connection) err = "connection must be an instance of PG::Connection, but was #{c.class}" raise(ArgumentError, err) end
wait_for_notify(t)
click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 70 def wait_for_notify(t) Array.new.tap do |msgs| connection.wait_for_notify(t) {|event, pid, msg| msgs << msg} end end