class Tackle::Consumer

Public Class Methods

new(params) click to toggle source
# File lib/tackle/consumer.rb, line 14
def initialize(params)
  @params = params
  @logger = @params.logger

  setup_rabbit_connections
end

Public Instance Methods

process_message(message, &block) click to toggle source
# File lib/tackle/consumer.rb, line 44
def process_message(message, &block)
  Tackle::Consumer::ActiveRecordConnectionReaper.run do
    begin
      message.log_info "Calling message processor"

      response = block.call(message.payload)

      unless @params.manual_ack?
        response = Tackle::ACK
      end

      case response
      when Tackle::ACK
        message.ack
      when Tackle::NACK
        redeliver_message(message, "Received Tackle::NACK")
      else
        raise "Response must be either Tackle::ACK or Tackle::NACK"
      end
    rescue Exception => ex
      redeliver_message(message, "Received exception '#{ex}'")

      raise ex
    end
  end
end
redeliver_message(message, reason) click to toggle source
# File lib/tackle/consumer.rb, line 71
def redeliver_message(message, reason)
  message.log_error "Failed to process message. #{reason}"
  message.log_error "Retry count #{message.retry_count}/#{@params.retry_limit}"

  if message.retry_count < @params.retry_limit
    @delay_queue.publish(message)
  else
    @dead_queue.publish(message)
  end

  message.nack
end
setup_rabbit_connections() click to toggle source
# File lib/tackle/consumer.rb, line 21
def setup_rabbit_connections
  @connection = Tackle::Connection.new(@params.amqp_url, @params.exception_handler, @logger, @params.connection)

  @exchange    = Exchange.new(@params.service, @params.routing_key, @connection, @logger)
  @main_queue  = MainQueue.new(@exchange, @connection, @logger)
  @delay_queue = DelayQueue.new(@params.retry_delay, @exchange, @connection, @logger)
  @dead_queue  = DeadQueue.new(@exchange, @connection, @logger)

  @exchange.bind_to_exchange(@params.exchange)
end
subscribe(&block) click to toggle source
# File lib/tackle/consumer.rb, line 32
def subscribe(&block)
  @logger.info "Subscribing to the main queue '#{@main_queue.name}'"

  @main_queue.subscribe { |message| process_message(message, &block) }
rescue Interrupt => _
  @connection.close
rescue StandardError => ex
  @logger.error("An exception occured message='#{ex.message}'")

  raise ex
end