class PgNotifier::Manager
Attributes
db_config[RW]
logger[RW]
timeout[RW]
Public Class Methods
new(attrs = {})
click to toggle source
# File lib/pg_notifier/manager.rb, line 9 def initialize(attrs = {}) @logger = attrs.fetch :logger, Logger.new(STDOUT) @db_config = attrs.fetch :db_config , {} @timeout = attrs.fetch :timeout, 0.1 @finish = false Thread.abort_on_exception = true @connection_mutex = Mutex.new end
Public Instance Methods
channels()
click to toggle source
# File lib/pg_notifier/manager.rb, line 29 def channels subscriptions_by_channels.keys end
connection()
click to toggle source
# File lib/pg_notifier/manager.rb, line 33 def connection @connection ||= PG::Connection.open db_config end
notify(channel, options = {}, &block)
click to toggle source
# File lib/pg_notifier/manager.rb, line 20 def notify(channel, options = {}, &block) subscriptions_by_channels[channel] ||= [] subscriptions_by_channels[channel] << Subscription.new(channel, options, &block) end
run()
click to toggle source
# File lib/pg_notifier/manager.rb, line 37 def run logger.info "Starting pg_notifier for #{channels.count} channels: [ #{channels.join(' ')} ]" Thread.new do channels.each do |channel| pg_result = @connection_mutex.synchronize { connection.exec "LISTEN #{channel};" } unless pg_result.result_status.eql? PG::PGRES_COMMAND_OK raise ChannelNotLaunched, "Channel ##{channel} not launched" end end until @finish do if notification = wait_notification() logger.info "Notifying channel: %s, pid: %s, payload: %s" % notification subscriptions = subscriptions_by_channels.fetch notification.first, [] subscriptions.each { |subscription| subscription.notify(*notification) } end end end end
shutdown()
click to toggle source
# File lib/pg_notifier/manager.rb, line 60 def shutdown logger.info 'Shutting down' @finish = true @connection_mutex.synchronize do unless connection.finished? connection.async_exec "UNLISTEN *;" connection.finish end end exit(0) end
subscriptions_by_channels()
click to toggle source
# File lib/pg_notifier/manager.rb, line 25 def subscriptions_by_channels @subscriptions_by_channels ||= {} end
Private Instance Methods
wait_notification()
click to toggle source
# File lib/pg_notifier/manager.rb, line 77 def wait_notification @connection_mutex.synchronize do connection.wait_for_notify(timeout) do |channel, pid, payload| return [channel, pid, payload] end end end