class Shoryuken::Queue

Constants

FIFO_ATTR
MESSAGE_GROUP_ID
VISIBILITY_TIMEOUT_ATTR

Attributes

client[RW]
name[RW]
url[RW]

Public Class Methods

new(client, name_or_url_or_arn) click to toggle source
# File lib/shoryuken/queue.rb, line 11
def initialize(client, name_or_url_or_arn)
  self.client = client
  set_name_and_url(name_or_url_or_arn)
end

Public Instance Methods

delete_messages(options) click to toggle source
# File lib/shoryuken/queue.rb, line 23
def delete_messages(options)
  client.delete_message_batch(
    options.merge(queue_url: url)
  ).failed.any? do |failure|
    logger.error do
      "Could not delete #{failure.id}, code: '#{failure.code}', message: '#{failure.message}', sender_fault: #{failure.sender_fault}"
    end
  end
end
fifo?() click to toggle source
# File lib/shoryuken/queue.rb, line 49
def fifo?
  # Make sure the memoization work with boolean to avoid multiple calls to SQS
  # see https://github.com/phstc/shoryuken/pull/529
  return @_fifo if defined?(@_fifo)

  @_fifo = queue_attributes.attributes[FIFO_ATTR] == 'true'
  @_fifo
end
receive_messages(options) click to toggle source
# File lib/shoryuken/queue.rb, line 45
def receive_messages(options)
  client.receive_message(options.merge(queue_url: url)).messages.map { |m| Message.new(client, self, m) }
end
send_message(options) click to toggle source
# File lib/shoryuken/queue.rb, line 33
def send_message(options)
  options = sanitize_message!(options).merge(queue_url: url)

  Shoryuken.client_middleware.invoke(options) do
    client.send_message(options)
  end
end
send_messages(options) click to toggle source
# File lib/shoryuken/queue.rb, line 41
def send_messages(options)
  client.send_message_batch(sanitize_messages!(options).merge(queue_url: url))
end
visibility_timeout() click to toggle source
# File lib/shoryuken/queue.rb, line 16
def visibility_timeout
  # Always lookup for the latest visibility when cache is disabled
  # setting it to nil, forces re-lookup
  @_visibility_timeout = nil unless Shoryuken.cache_visibility_timeout?
  @_visibility_timeout ||= queue_attributes.attributes[VISIBILITY_TIMEOUT_ATTR].to_i
end

Private Instance Methods

add_fifo_attributes!(options) click to toggle source
# File lib/shoryuken/queue.rb, line 129
def add_fifo_attributes!(options)
  return unless fifo?

  options[:message_group_id]         ||= MESSAGE_GROUP_ID
  options[:message_deduplication_id] ||= Digest::SHA256.hexdigest(options[:message_body].to_s)

  options
end
arn_to_url(arn_str) click to toggle source
# File lib/shoryuken/queue.rb, line 75
def arn_to_url(arn_str)
  *, region, account_id, resource = arn_str.split(':')

  required = [region, account_id, resource].map(&:to_s)
  valid = required.none?(&:empty?)

  abort "Invalid ARN: #{arn_str}. A valid ARN must include: region, account_id and resource." unless valid

  "https://sqs.#{region}.amazonaws.com/#{account_id}/#{resource}"
end
initialize_fifo_attribute() click to toggle source
# File lib/shoryuken/queue.rb, line 60
def initialize_fifo_attribute
  # calling fifo? will also initialize it
  fifo?
end
queue_attributes() click to toggle source
# File lib/shoryuken/queue.rb, line 109
def queue_attributes
  # Note: Retrieving all queue attributes as requesting `FifoQueue` on non-FIFO queue raises error.
  # See issue: https://github.com/aws/aws-sdk-ruby/issues/1350
  client.get_queue_attributes(queue_url: url, attribute_names: ['All'])
end
sanitize_message!(options) click to toggle source
# File lib/shoryuken/queue.rb, line 138
def sanitize_message!(options)
  options = { message_body: options } if options.is_a?(String)

  if (body = options[:message_body]).is_a?(Hash)
    options[:message_body] = JSON.dump(body)
  end

  add_fifo_attributes!(options)

  options
end
sanitize_messages!(options) click to toggle source
# File lib/shoryuken/queue.rb, line 115
def sanitize_messages!(options)
  if options.is_a?(Array)
    entries = options.map.with_index do |m, index|
      { id: index.to_s }.merge(m.is_a?(Hash) ? m : { message_body: m })
    end

    options = { entries: entries }
  end

  options[:entries].each(&method(:sanitize_message!))

  options
end
set_by_name(name) click to toggle source
# File lib/shoryuken/queue.rb, line 65
def set_by_name(name) # rubocop:disable Naming/AccessorMethodName
  self.name = name
  self.url  = client.get_queue_url(queue_name: name).queue_url
end
set_by_url(url) click to toggle source
# File lib/shoryuken/queue.rb, line 70
def set_by_url(url) # rubocop:disable Naming/AccessorMethodName
  self.name = url.split('/').last
  self.url  = url
end
set_name_and_url(name_or_url_or_arn) click to toggle source
# File lib/shoryuken/queue.rb, line 86
def set_name_and_url(name_or_url_or_arn) # rubocop:disable Naming/AccessorMethodName
  if name_or_url_or_arn.include?('://')
    set_by_url(name_or_url_or_arn)

    # anticipate the fifo? checker for validating the queue URL
    initialize_fifo_attribute
    return
  end

  if name_or_url_or_arn.start_with?('arn:')
    url = arn_to_url(name_or_url_or_arn)
    set_by_url(url)

    # anticipate the fifo? checker for validating the queue URL
    initialize_fifo_attribute
    return
  end

  set_by_name(name_or_url_or_arn)
rescue Aws::Errors::NoSuchEndpointError, Aws::SQS::Errors::NonExistentQueue => e
  raise e, "The specified queue #{name_or_url_or_arn} does not exist."
end