class PgNotifier::Manager

Attributes

db_config[RW]
logger[RW]
timeout[RW]

Public Class Methods

new(attrs = {}) click to toggle source
# File lib/pg_notifier/manager.rb, line 9
def initialize(attrs = {})
  @logger = attrs.fetch :logger, Logger.new(STDOUT)
  @db_config = attrs.fetch :db_config , {}
  @timeout = attrs.fetch :timeout, 0.1

  @finish = false
  Thread.abort_on_exception = true

  @connection_mutex = Mutex.new
end

Public Instance Methods

channels() click to toggle source
# File lib/pg_notifier/manager.rb, line 29
def channels
  subscriptions_by_channels.keys
end
connection() click to toggle source
# File lib/pg_notifier/manager.rb, line 33
def connection
  @connection ||= PG::Connection.open db_config
end
notify(channel, options = {}, &block) click to toggle source
# File lib/pg_notifier/manager.rb, line 20
def notify(channel, options = {}, &block)
  subscriptions_by_channels[channel] ||= []
  subscriptions_by_channels[channel] << Subscription.new(channel, options, &block)
end
run() click to toggle source
# File lib/pg_notifier/manager.rb, line 37
def run
  logger.info "Starting pg_notifier for #{channels.count} channels: [ #{channels.join(' ')} ]"

  Thread.new do
    channels.each do |channel|
      pg_result = @connection_mutex.synchronize { connection.exec "LISTEN #{channel};" }

      unless pg_result.result_status.eql? PG::PGRES_COMMAND_OK
        raise ChannelNotLaunched, "Channel ##{channel} not launched"
      end
    end

    until @finish do
      if notification = wait_notification()
        logger.info "Notifying channel: %s, pid: %s, payload: %s" % notification

        subscriptions = subscriptions_by_channels.fetch notification.first, []
        subscriptions.each { |subscription| subscription.notify(*notification) }
      end
    end
  end
end
shutdown() click to toggle source
# File lib/pg_notifier/manager.rb, line 60
def shutdown
  logger.info 'Shutting down'

  @finish = true

  @connection_mutex.synchronize do
    unless connection.finished?
      connection.async_exec "UNLISTEN *;"
      connection.finish
    end
  end

  exit(0)
end
subscriptions_by_channels() click to toggle source
# File lib/pg_notifier/manager.rb, line 25
def subscriptions_by_channels
  @subscriptions_by_channels ||= {}
end

Private Instance Methods

wait_notification() click to toggle source
# File lib/pg_notifier/manager.rb, line 77
def wait_notification
  @connection_mutex.synchronize do
    connection.wait_for_notify(timeout) do |channel, pid, payload|
      return [channel, pid, payload]
    end
  end
end