class Mimi::Messaging::SQS_SNS::Consumer
Message consumer for SQS queues
Constants
- NACK_VISIBILITY_TIMEOUT
(seconds) determines how soon the NACK-ed message becomes visible to other consumers
Public Class Methods
new(adapter, queue_url, &block)
click to toggle source
# File lib/mimi/messaging/sqs_sns/consumer.rb, line 13 def initialize(adapter, queue_url, &block) @stop_requested = false Mimi::Messaging.log "Starting consumer for: #{queue_url}" @consumer_thread = Thread.new do while not @stop_requested read_and_process_message(adapter, queue_url, block) end Mimi::Messaging.log "Stopping consumer for: #{queue_url}" end end
Public Instance Methods
signal_stop()
click to toggle source
Requests the Consumer
to stop, without actually waiting for it
# File lib/mimi/messaging/sqs_sns/consumer.rb, line 26 def signal_stop @stop_requested = true end
stop()
click to toggle source
Requests the Consumer
to stop AND waits until it does
# File lib/mimi/messaging/sqs_sns/consumer.rb, line 32 def stop @stop_requested = true @consumer_thread.join end
Private Instance Methods
ack_message(adapter, queue_url, msg)
click to toggle source
ACK-ing the message indicates successfull processing of it and removes the message from the queue
# File lib/mimi/messaging/sqs_sns/consumer.rb, line 98 def ack_message(adapter, queue_url, msg) adapter.sqs_client.delete_message( queue_url: queue_url, receipt_handle: msg.receipt_handle ) end
nack_message(adapter, queue_url, msg)
click to toggle source
NACK-ing the message indicates a failure to process the message. The message becomes immediately available to other consumers.
# File lib/mimi/messaging/sqs_sns/consumer.rb, line 107 def nack_message(adapter, queue_url, msg) adapter.sqs_client.change_message_visibility( queue_url: queue_url, receipt_handle: msg.receipt_handle, visibility_timeout: NACK_VISIBILITY_TIMEOUT ) rescue StandardError => e Mimi::Messaging.logger&.error( "#{self.class}: failed to NACK message from: #{queue_url}," \ " error: (#{e.class}) #{e}" ) # NOTE: error is recovered and the message is neither ACKed or NACKed end
process_message(adapter, queue_url, message, block)
click to toggle source
# File lib/mimi/messaging/sqs_sns/consumer.rb, line 81 def process_message(adapter, queue_url, message, block) block.call(message) ack_message(adapter, queue_url, message) rescue Mimi::Messaging::NACK Mimi::Messaging.log "NACK-ing message from: #{queue_url}" nack_message(adapter, queue_url, message) rescue StandardError => e Mimi::Messaging.logger&.error( "#{self.class}: failed to process message from: #{queue_url}," \ " error: (#{e.class}) #{e}" ) # NOTE: error is recovered and the message is neither ACKed or NACKed end
read_and_process_message(adapter, queue_url, block)
click to toggle source
A method invoked in a loop to read/wait for a message from the associated queue and process it
@param adapter [Mimi::Messaging::SQS_SNS::Adapter] @param queue_url [String] @param block [Proc] a block to be invoked when a message is received
# File lib/mimi/messaging/sqs_sns/consumer.rb, line 46 def read_and_process_message(adapter, queue_url, block) message = read_message(adapter, queue_url) return unless message Mimi::Messaging.log "Read message from: #{queue_url}" begin adapter.worker_pool.post do process_message(adapter, queue_url, message, block) end rescue Concurrent::RejectedExecutionError # the backlog is overflown, put the message back Mimi::Messaging.log "Worker pool backlog is full, nack-ing the message " \ "(workers:#{adapter.worker_pool.length}, backlog:#{adapter.worker_pool.queue_length})" nack_message(adapter, queue_url, message) end rescue StandardError => e Mimi::Messaging.logger&.error( "#{self.class}: failed to read or process message from: #{queue_url}," \ " error: (#{e.class}) #{e}" ) end
read_message(adapter, queue_url)
click to toggle source
# File lib/mimi/messaging/sqs_sns/consumer.rb, line 68 def read_message(adapter, queue_url) result = adapter.sqs_client.receive_message( queue_url: queue_url, max_number_of_messages: 1, wait_time_seconds: adapter.options[:mq_aws_sqs_read_timeout], message_attribute_names: ["All"] ) return nil if result.messages.count == 0 return result.messages.first if result.messages.count == 1 raise Mimi::Messaging::ConnectionError, "Unexpected number of messages read" end