class Shoryuken::Fetcher
Constants
- FETCH_LIMIT
Public Class Methods
new(group)
click to toggle source
# File lib/shoryuken/fetcher.rb, line 7 def initialize(group) @group = group end
Public Instance Methods
fetch(queue, limit)
click to toggle source
# File lib/shoryuken/fetcher.rb, line 11 def fetch(queue, limit) fetch_with_auto_retry(3) do started_at = Time.now logger.debug { "Looking for new messages in #{queue}" } sqs_msgs = Array(receive_messages(queue, limit)) logger.debug { "Found #{sqs_msgs.size} messages for #{queue.name}" } unless sqs_msgs.empty? logger.debug { "Fetcher for #{queue} completed in #{elapsed(started_at)} ms" } sqs_msgs end end
Private Instance Methods
batched_queue?(queue)
click to toggle source
# File lib/shoryuken/fetcher.rb, line 83 def batched_queue?(queue) Shoryuken.worker_registry.batch_receive_messages?(queue.name) end
fetch_with_auto_retry(max_attempts) { || ... }
click to toggle source
# File lib/shoryuken/fetcher.rb, line 28 def fetch_with_auto_retry(max_attempts) attempts = 0 begin yield rescue => ex # Tries to auto retry connectivity errors raise if attempts >= max_attempts attempts += 1 logger.debug { "Retrying fetch attempt #{attempts} for #{ex.message}" } sleep((1..5).to_a.sample) retry end end
max_number_of_messages(shoryuken_queue, limit, options)
click to toggle source
# File lib/shoryuken/fetcher.rb, line 61 def max_number_of_messages(shoryuken_queue, limit, options) # For FIFO queues we want to make sure we process one message per group at a time # if we set max_number_of_messages greater than 1, # SQS may return more than one message for the same message group # since Shoryuken uses threads, it will try to process more than one message at once # > The message group ID is the tag that specifies that a message belongs to a specific message group. # > Messages that belong to the same message group are always processed one by one, # > in a strict order relative to the message group # > (however, messages that belong to different message groups might be processed out of order). # > https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html limit = 1 if shoryuken_queue.fifo? && !batched_queue?(shoryuken_queue) [limit, FETCH_LIMIT, options[:max_number_of_messages]].compact.min end
receive_messages(queue, limit)
click to toggle source
# File lib/shoryuken/fetcher.rb, line 47 def receive_messages(queue, limit) options = receive_options(queue) shoryuken_queue = Shoryuken::Client.queues(queue.name) options[:max_number_of_messages] = max_number_of_messages(shoryuken_queue, limit, options) options[:message_attribute_names] = %w[All] options[:attribute_names] = %w[All] options.merge!(queue.options) shoryuken_queue.receive_messages(options) end
receive_options(queue)
click to toggle source
# File lib/shoryuken/fetcher.rb, line 76 def receive_options(queue) options = Shoryuken.sqs_client_receive_message_opts[queue.name] options ||= Shoryuken.sqs_client_receive_message_opts[@group] options.to_h.dup end