class AMQPHelpers::Daemon

Constants

DEFAULT_RECONNECT_WAIT_TIME

Attributes

connection_params[RW]
environment[W]
exchanges[RW]
logger[W]
name[RW]
queue_name[W]
queue_params[W]
reconnect_wait_time[W]

Public Class Methods

new(config) click to toggle source
# File lib/amqp_helpers/daemon.rb, line 18
def initialize(config)
  config.each do |key, value|
    setter = "#{key}="
    if respond_to?(setter)
      send(setter, value)
    end
  end
end

Public Instance Methods

environment() click to toggle source
# File lib/amqp_helpers/daemon.rb, line 50
def environment
  @environment ||= 'development'
end
logger() click to toggle source
# File lib/amqp_helpers/daemon.rb, line 62
def logger
  @logger ||= if environment == 'development'
                Logger.new(STDOUT)
              else
                Syslogger.new(name, Syslog::LOG_PID, Syslog::LOG_LOCAL0)
              end
end
queue_name() click to toggle source
# File lib/amqp_helpers/daemon.rb, line 58
def queue_name
  @queue_name ||= "#{Socket.gethostname}.#{name}"
end
queue_params() click to toggle source
# File lib/amqp_helpers/daemon.rb, line 54
def queue_params
  @queue_params ||= {}
end
reconnect_wait_time() click to toggle source
# File lib/amqp_helpers/daemon.rb, line 70
def reconnect_wait_time
  @reconnect_wait_time ||= DEFAULT_RECONNECT_WAIT_TIME
end
start(opts = {}, &handler) click to toggle source
# File lib/amqp_helpers/daemon.rb, line 27
def start(opts = {}, &handler)
  logger.info "Starting #{name} daemon..."
  tcp_connection_failure_handler = Proc.new(&method(:handle_tcp_connection_failure))
  amqp_params = { on_tcp_connection_failure: tcp_connection_failure_handler}.merge(connection_params)
  AMQP.start(amqp_params) do |connection|
    connection.on_open(&method(:handle_open))
    connection.on_error(&method(:handle_connection_error))
    channel = initialize_channel(connection)
    connection.on_tcp_connection_loss(&method(:handle_tcp_connection_loss))
    connection.on_recovery(&method(:handle_recovery))

    queue = initialize_queue(channel)
    queue.subscribe(opts, &handler)

    show_stopper = Proc.new do |signal|
      logger.info "Signal #{signal} received. #{name} is going down... I REPEAT: WE ARE GOING DOWN!"
      connection.close { EventMachine.stop }
    end
    Signal.trap 'INT', show_stopper
    Signal.trap 'TERM', show_stopper
  end
end

Protected Instance Methods

handle_channel_error(channel, channel_close) click to toggle source
# File lib/amqp_helpers/daemon.rb, line 95
def handle_channel_error(channel, channel_close)
  logger.error "[channel.close] Reply code = #{channel_close.reply_code}, reply text = #{channel_close.reply_text}"
  raise ChannelError, channel_close.reply_text
end
handle_connection_error(connection, connection_close) click to toggle source
# File lib/amqp_helpers/daemon.rb, line 84
def handle_connection_error(connection, connection_close)
  logger.error "[connection.close] Reply code = #{connection_close.reply_code}, reply text = #{connection_close.reply_text}"
  # check if graceful broker shutdown
  if connection_close.reply_code == 320
    logger.info "[connection.close] Setting up a periodic reconnection timer (#{reconnect_wait_time}s)..."
    connection.periodically_reconnect(reconnect_wait_time)
  else
    raise ConnectionError, connection_close.reply_text
  end
end
handle_open() click to toggle source
# File lib/amqp_helpers/daemon.rb, line 80
def handle_open
  logger.info "#{name} successfully opened AMQP connection"
end
handle_recovery(connection, settings) click to toggle source
# File lib/amqp_helpers/daemon.rb, line 105
def handle_recovery(connection, settings)
  logger.info 'Yay, reconnected! All systems go!'
end
handle_tcp_connection_failure(settings) click to toggle source
# File lib/amqp_helpers/daemon.rb, line 75
def handle_tcp_connection_failure(settings)
  logger.error "[network failure] Could not connect to #{settings[:host]}:#{settings[:port]}"
  raise ConnectionError, "Failed to connect!"
end
handle_tcp_connection_loss(connection, settings) click to toggle source
# File lib/amqp_helpers/daemon.rb, line 100
def handle_tcp_connection_loss(connection, settings)
  logger.error '[network failure] Trying to reconnect...'
  connection.reconnect(false, reconnect_wait_time)
end
initialize_channel(connection) click to toggle source
# File lib/amqp_helpers/daemon.rb, line 109
def initialize_channel(connection)
  channel = AMQP::Channel.new(connection)
  channel.auto_recovery = true
  channel.on_error(&method(:handle_channel_error))
end
initialize_queue(channel) click to toggle source
# File lib/amqp_helpers/daemon.rb, line 115
def initialize_queue(channel)
  channel.queue(queue_name, queue_params).tap do |queue|
    exchanges.each do |exchange_name, exchange_config|
      exchange = channel.topic(exchange_name, exchange_config[:params])
      (exchange_config[:bindings] || []).each do |params|
        queue.bind(exchange, params)
      end
    end
  end
end