class AMQPHelpers::Daemon
Constants
- DEFAULT_RECONNECT_WAIT_TIME
Attributes
connection_params[RW]
environment[W]
exchanges[RW]
logger[W]
name[RW]
queue_name[W]
queue_params[W]
reconnect_wait_time[W]
Public Class Methods
new(config)
click to toggle source
# File lib/amqp_helpers/daemon.rb, line 18 def initialize(config) config.each do |key, value| setter = "#{key}=" if respond_to?(setter) send(setter, value) end end end
Public Instance Methods
environment()
click to toggle source
# File lib/amqp_helpers/daemon.rb, line 50 def environment @environment ||= 'development' end
logger()
click to toggle source
# File lib/amqp_helpers/daemon.rb, line 62 def logger @logger ||= if environment == 'development' Logger.new(STDOUT) else Syslogger.new(name, Syslog::LOG_PID, Syslog::LOG_LOCAL0) end end
queue_name()
click to toggle source
# File lib/amqp_helpers/daemon.rb, line 58 def queue_name @queue_name ||= "#{Socket.gethostname}.#{name}" end
queue_params()
click to toggle source
# File lib/amqp_helpers/daemon.rb, line 54 def queue_params @queue_params ||= {} end
reconnect_wait_time()
click to toggle source
# File lib/amqp_helpers/daemon.rb, line 70 def reconnect_wait_time @reconnect_wait_time ||= DEFAULT_RECONNECT_WAIT_TIME end
start(opts = {}, &handler)
click to toggle source
# File lib/amqp_helpers/daemon.rb, line 27 def start(opts = {}, &handler) logger.info "Starting #{name} daemon..." tcp_connection_failure_handler = Proc.new(&method(:handle_tcp_connection_failure)) amqp_params = { on_tcp_connection_failure: tcp_connection_failure_handler}.merge(connection_params) AMQP.start(amqp_params) do |connection| connection.on_open(&method(:handle_open)) connection.on_error(&method(:handle_connection_error)) channel = initialize_channel(connection) connection.on_tcp_connection_loss(&method(:handle_tcp_connection_loss)) connection.on_recovery(&method(:handle_recovery)) queue = initialize_queue(channel) queue.subscribe(opts, &handler) show_stopper = Proc.new do |signal| logger.info "Signal #{signal} received. #{name} is going down... I REPEAT: WE ARE GOING DOWN!" connection.close { EventMachine.stop } end Signal.trap 'INT', show_stopper Signal.trap 'TERM', show_stopper end end
Protected Instance Methods
handle_channel_error(channel, channel_close)
click to toggle source
# File lib/amqp_helpers/daemon.rb, line 95 def handle_channel_error(channel, channel_close) logger.error "[channel.close] Reply code = #{channel_close.reply_code}, reply text = #{channel_close.reply_text}" raise ChannelError, channel_close.reply_text end
handle_connection_error(connection, connection_close)
click to toggle source
# File lib/amqp_helpers/daemon.rb, line 84 def handle_connection_error(connection, connection_close) logger.error "[connection.close] Reply code = #{connection_close.reply_code}, reply text = #{connection_close.reply_text}" # check if graceful broker shutdown if connection_close.reply_code == 320 logger.info "[connection.close] Setting up a periodic reconnection timer (#{reconnect_wait_time}s)..." connection.periodically_reconnect(reconnect_wait_time) else raise ConnectionError, connection_close.reply_text end end
handle_open()
click to toggle source
# File lib/amqp_helpers/daemon.rb, line 80 def handle_open logger.info "#{name} successfully opened AMQP connection" end
handle_recovery(connection, settings)
click to toggle source
# File lib/amqp_helpers/daemon.rb, line 105 def handle_recovery(connection, settings) logger.info 'Yay, reconnected! All systems go!' end
handle_tcp_connection_failure(settings)
click to toggle source
# File lib/amqp_helpers/daemon.rb, line 75 def handle_tcp_connection_failure(settings) logger.error "[network failure] Could not connect to #{settings[:host]}:#{settings[:port]}" raise ConnectionError, "Failed to connect!" end
handle_tcp_connection_loss(connection, settings)
click to toggle source
# File lib/amqp_helpers/daemon.rb, line 100 def handle_tcp_connection_loss(connection, settings) logger.error '[network failure] Trying to reconnect...' connection.reconnect(false, reconnect_wait_time) end
initialize_channel(connection)
click to toggle source
# File lib/amqp_helpers/daemon.rb, line 109 def initialize_channel(connection) channel = AMQP::Channel.new(connection) channel.auto_recovery = true channel.on_error(&method(:handle_channel_error)) end
initialize_queue(channel)
click to toggle source
# File lib/amqp_helpers/daemon.rb, line 115 def initialize_queue(channel) channel.queue(queue_name, queue_params).tap do |queue| exchanges.each do |exchange_name, exchange_config| exchange = channel.topic(exchange_name, exchange_config[:params]) (exchange_config[:bindings] || []).each do |params| queue.bind(exchange, params) end end end end