class SqsConsumer::Consumer

Constants

RECEIVE_MESSAGES_DEFAULT

Public Class Methods

new(endpoint:, region:, work_queue_name:, throttle_sleep_duration: 0.0, receive_messages_options: {}, message_filter: nil, processor_action: nil, success_result: nil, error_callback: nil) click to toggle source
# File lib/sqs_consumer/consumer.rb, line 8
def initialize(endpoint:, region:, work_queue_name:, throttle_sleep_duration: 0.0,
  receive_messages_options: {}, message_filter: nil, processor_action: nil, success_result: nil, error_callback: nil)
  @client_config = {
    endpoint: endpoint,
    region: region
  }
  @work_queue_name = work_queue_name
  @throttle_sleep_duration = throttle_sleep_duration
  @logger = PaulBunyan.logger
  @receive_messages_options = receive_messages_options.merge(RECEIVE_MESSAGES_DEFAULT)
  @message_filter = message_filter
  @processor_action = processor_action
  @success_result = success_result
  @error_callback = error_callback
  @running = false
end

Public Instance Methods

create_queue() click to toggle source
# File lib/sqs_consumer/consumer.rb, line 37
def create_queue
  sqs_client = Aws::SQS::Client.new(@client_config)
  sqs_queue_url = sqs_client.get_queue_url(queue_name: @work_queue_name)[:queue_url]
  Aws::SQS::Queue.new(sqs_queue_url, sqs_client.config)
end
run() click to toggle source
# File lib/sqs_consumer/consumer.rb, line 43
def run
  @running = true
  queue = create_queue

  loop do
    sqs_messages = queue.receive_messages(@receive_messages_options)

    @logger.debug { "Got #{sqs_messages.size} messages from SQS." } unless sqs_messages.size == 0

    sqs_messages.each do |sqs_message|
      begin
        if should_process? sqs_message
          result = @processor_action&.call(sqs_message)
          if @success_result.nil? || @success_result == result
            sqs_message.delete
          end
        end
      rescue => e
        @error_callback&.call(e, sqs_message)
      end
    end
    break unless running?
    sleep @throttle_sleep_duration unless @throttle_sleep_duration.zero?
  end
end
running?() click to toggle source
# File lib/sqs_consumer/consumer.rb, line 33
def running?
  @running
end
should_process?(sqs_message) click to toggle source
# File lib/sqs_consumer/consumer.rb, line 25
def should_process?(sqs_message)
  @message_filter.nil? || @message_filter.call(sqs_message.message_attributes)
end
stop() click to toggle source
# File lib/sqs_consumer/consumer.rb, line 29
def stop
  @running = false
end