class Mimi::Messaging::SQS_SNS::Consumer

Message consumer for SQS queues

Constants

NACK_VISIBILITY_TIMEOUT

(seconds) determines how soon the NACK-ed message becomes visible to other consumers

Public Class Methods

new(adapter, queue_url, &block) click to toggle source
# File lib/mimi/messaging/sqs_sns/consumer.rb, line 13
def initialize(adapter, queue_url, &block)
  @stop_requested = false
  Mimi::Messaging.log "Starting consumer for: #{queue_url}"
  @consumer_thread = Thread.new do
    while not @stop_requested
      read_and_process_message(adapter, queue_url, block)
    end
    Mimi::Messaging.log "Stopping consumer for: #{queue_url}"
  end
end

Public Instance Methods

signal_stop() click to toggle source

Requests the Consumer to stop, without actually waiting for it

# File lib/mimi/messaging/sqs_sns/consumer.rb, line 26
def signal_stop
  @stop_requested = true
end
stop() click to toggle source

Requests the Consumer to stop AND waits until it does

# File lib/mimi/messaging/sqs_sns/consumer.rb, line 32
def stop
  @stop_requested = true
  @consumer_thread.join
end

Private Instance Methods

ack_message(adapter, queue_url, msg) click to toggle source

ACK-ing the message indicates successfull processing of it and removes the message from the queue

# File lib/mimi/messaging/sqs_sns/consumer.rb, line 98
def ack_message(adapter, queue_url, msg)
  adapter.sqs_client.delete_message(
    queue_url: queue_url, receipt_handle: msg.receipt_handle
  )
end
nack_message(adapter, queue_url, msg) click to toggle source

NACK-ing the message indicates a failure to process the message. The message becomes immediately available to other consumers.

# File lib/mimi/messaging/sqs_sns/consumer.rb, line 107
def nack_message(adapter, queue_url, msg)
  adapter.sqs_client.change_message_visibility(
    queue_url: queue_url,
    receipt_handle: msg.receipt_handle,
    visibility_timeout: NACK_VISIBILITY_TIMEOUT
  )
rescue StandardError => e
  Mimi::Messaging.logger&.error(
    "#{self.class}: failed to NACK message from: #{queue_url}," \
    " error: (#{e.class}) #{e}"
  )
  # NOTE: error is recovered and the message is neither ACKed or NACKed
end
process_message(adapter, queue_url, message, block) click to toggle source
# File lib/mimi/messaging/sqs_sns/consumer.rb, line 81
def process_message(adapter, queue_url, message, block)
  block.call(message)
  ack_message(adapter, queue_url, message)
rescue Mimi::Messaging::NACK
  Mimi::Messaging.log "NACK-ing message from: #{queue_url}"
  nack_message(adapter, queue_url, message)
rescue StandardError => e
  Mimi::Messaging.logger&.error(
    "#{self.class}: failed to process message from: #{queue_url}," \
    " error: (#{e.class}) #{e}"
  )
  # NOTE: error is recovered and the message is neither ACKed or NACKed
end
read_and_process_message(adapter, queue_url, block) click to toggle source

A method invoked in a loop to read/wait for a message from the associated queue and process it

@param adapter [Mimi::Messaging::SQS_SNS::Adapter] @param queue_url [String] @param block [Proc] a block to be invoked when a message is received

# File lib/mimi/messaging/sqs_sns/consumer.rb, line 46
def read_and_process_message(adapter, queue_url, block)
  message = read_message(adapter, queue_url)
  return unless message

  Mimi::Messaging.log "Read message from: #{queue_url}"
  begin
    adapter.worker_pool.post do
      process_message(adapter, queue_url, message, block)
    end
  rescue Concurrent::RejectedExecutionError
    # the backlog is overflown, put the message back
    Mimi::Messaging.log "Worker pool backlog is full, nack-ing the message " \
      "(workers:#{adapter.worker_pool.length}, backlog:#{adapter.worker_pool.queue_length})"
    nack_message(adapter, queue_url, message)
  end
rescue StandardError => e
  Mimi::Messaging.logger&.error(
    "#{self.class}: failed to read or process message from: #{queue_url}," \
    " error: (#{e.class}) #{e}"
  )
end
read_message(adapter, queue_url) click to toggle source
# File lib/mimi/messaging/sqs_sns/consumer.rb, line 68
def read_message(adapter, queue_url)
  result = adapter.sqs_client.receive_message(
    queue_url: queue_url,
    max_number_of_messages: 1,
    wait_time_seconds: adapter.options[:mq_aws_sqs_read_timeout],
    message_attribute_names: ["All"]
  )
  return nil if result.messages.count == 0
  return result.messages.first if result.messages.count == 1

  raise Mimi::Messaging::ConnectionError, "Unexpected number of messages read"
end