class Tackle::Consumer
Public Class Methods
new(params)
click to toggle source
# File lib/tackle/consumer.rb, line 14 def initialize(params) @params = params @logger = @params.logger setup_rabbit_connections end
Public Instance Methods
process_message(message, &block)
click to toggle source
# File lib/tackle/consumer.rb, line 44 def process_message(message, &block) Tackle::Consumer::ActiveRecordConnectionReaper.run do begin message.log_info "Calling message processor" response = block.call(message.payload) unless @params.manual_ack? response = Tackle::ACK end case response when Tackle::ACK message.ack when Tackle::NACK redeliver_message(message, "Received Tackle::NACK") else raise "Response must be either Tackle::ACK or Tackle::NACK" end rescue Exception => ex redeliver_message(message, "Received exception '#{ex}'") raise ex end end end
redeliver_message(message, reason)
click to toggle source
# File lib/tackle/consumer.rb, line 71 def redeliver_message(message, reason) message.log_error "Failed to process message. #{reason}" message.log_error "Retry count #{message.retry_count}/#{@params.retry_limit}" if message.retry_count < @params.retry_limit @delay_queue.publish(message) else @dead_queue.publish(message) end message.nack end
setup_rabbit_connections()
click to toggle source
# File lib/tackle/consumer.rb, line 21 def setup_rabbit_connections @connection = Tackle::Connection.new(@params.amqp_url, @params.exception_handler, @logger, @params.connection) @exchange = Exchange.new(@params.service, @params.routing_key, @connection, @logger) @main_queue = MainQueue.new(@exchange, @connection, @logger) @delay_queue = DelayQueue.new(@params.retry_delay, @exchange, @connection, @logger) @dead_queue = DeadQueue.new(@exchange, @connection, @logger) @exchange.bind_to_exchange(@params.exchange) end
subscribe(&block)
click to toggle source
# File lib/tackle/consumer.rb, line 32 def subscribe(&block) @logger.info "Subscribing to the main queue '#{@main_queue.name}'" @main_queue.subscribe { |message| process_message(message, &block) } rescue Interrupt => _ @connection.close rescue StandardError => ex @logger.error("An exception occured message='#{ex.message}'") raise ex end