class SqsConsumer::Consumer
Constants
- RECEIVE_MESSAGES_DEFAULT
Public Class Methods
new(endpoint:, region:, work_queue_name:, throttle_sleep_duration: 0.0, receive_messages_options: {}, message_filter: nil, processor_action: nil, success_result: nil, error_callback: nil)
click to toggle source
# File lib/sqs_consumer/consumer.rb, line 8 def initialize(endpoint:, region:, work_queue_name:, throttle_sleep_duration: 0.0, receive_messages_options: {}, message_filter: nil, processor_action: nil, success_result: nil, error_callback: nil) @client_config = { endpoint: endpoint, region: region } @work_queue_name = work_queue_name @throttle_sleep_duration = throttle_sleep_duration @logger = PaulBunyan.logger @receive_messages_options = receive_messages_options.merge(RECEIVE_MESSAGES_DEFAULT) @message_filter = message_filter @processor_action = processor_action @success_result = success_result @error_callback = error_callback @running = false end
Public Instance Methods
create_queue()
click to toggle source
# File lib/sqs_consumer/consumer.rb, line 37 def create_queue sqs_client = Aws::SQS::Client.new(@client_config) sqs_queue_url = sqs_client.get_queue_url(queue_name: @work_queue_name)[:queue_url] Aws::SQS::Queue.new(sqs_queue_url, sqs_client.config) end
run()
click to toggle source
# File lib/sqs_consumer/consumer.rb, line 43 def run @running = true queue = create_queue loop do sqs_messages = queue.receive_messages(@receive_messages_options) @logger.debug { "Got #{sqs_messages.size} messages from SQS." } unless sqs_messages.size == 0 sqs_messages.each do |sqs_message| begin if should_process? sqs_message result = @processor_action&.call(sqs_message) if @success_result.nil? || @success_result == result sqs_message.delete end end rescue => e @error_callback&.call(e, sqs_message) end end break unless running? sleep @throttle_sleep_duration unless @throttle_sleep_duration.zero? end end
running?()
click to toggle source
# File lib/sqs_consumer/consumer.rb, line 33 def running? @running end
should_process?(sqs_message)
click to toggle source
# File lib/sqs_consumer/consumer.rb, line 25 def should_process?(sqs_message) @message_filter.nil? || @message_filter.call(sqs_message.message_attributes) end
stop()
click to toggle source
# File lib/sqs_consumer/consumer.rb, line 29 def stop @running = false end