class Shoryuken::Queue
Constants
- FIFO_ATTR
- MESSAGE_GROUP_ID
- VISIBILITY_TIMEOUT_ATTR
Attributes
client[RW]
name[RW]
url[RW]
Public Class Methods
new(client, name_or_url_or_arn)
click to toggle source
# File lib/shoryuken/queue.rb, line 11 def initialize(client, name_or_url_or_arn) self.client = client set_name_and_url(name_or_url_or_arn) end
Public Instance Methods
delete_messages(options)
click to toggle source
# File lib/shoryuken/queue.rb, line 23 def delete_messages(options) client.delete_message_batch( options.merge(queue_url: url) ).failed.any? do |failure| logger.error do "Could not delete #{failure.id}, code: '#{failure.code}', message: '#{failure.message}', sender_fault: #{failure.sender_fault}" end end end
fifo?()
click to toggle source
# File lib/shoryuken/queue.rb, line 49 def fifo? # Make sure the memoization work with boolean to avoid multiple calls to SQS # see https://github.com/phstc/shoryuken/pull/529 return @_fifo if defined?(@_fifo) @_fifo = queue_attributes.attributes[FIFO_ATTR] == 'true' @_fifo end
receive_messages(options)
click to toggle source
# File lib/shoryuken/queue.rb, line 45 def receive_messages(options) client.receive_message(options.merge(queue_url: url)).messages.map { |m| Message.new(client, self, m) } end
send_message(options)
click to toggle source
# File lib/shoryuken/queue.rb, line 33 def send_message(options) options = sanitize_message!(options).merge(queue_url: url) Shoryuken.client_middleware.invoke(options) do client.send_message(options) end end
send_messages(options)
click to toggle source
# File lib/shoryuken/queue.rb, line 41 def send_messages(options) client.send_message_batch(sanitize_messages!(options).merge(queue_url: url)) end
visibility_timeout()
click to toggle source
# File lib/shoryuken/queue.rb, line 16 def visibility_timeout # Always lookup for the latest visibility when cache is disabled # setting it to nil, forces re-lookup @_visibility_timeout = nil unless Shoryuken.cache_visibility_timeout? @_visibility_timeout ||= queue_attributes.attributes[VISIBILITY_TIMEOUT_ATTR].to_i end
Private Instance Methods
add_fifo_attributes!(options)
click to toggle source
# File lib/shoryuken/queue.rb, line 129 def add_fifo_attributes!(options) return unless fifo? options[:message_group_id] ||= MESSAGE_GROUP_ID options[:message_deduplication_id] ||= Digest::SHA256.hexdigest(options[:message_body].to_s) options end
arn_to_url(arn_str)
click to toggle source
# File lib/shoryuken/queue.rb, line 75 def arn_to_url(arn_str) *, region, account_id, resource = arn_str.split(':') required = [region, account_id, resource].map(&:to_s) valid = required.none?(&:empty?) abort "Invalid ARN: #{arn_str}. A valid ARN must include: region, account_id and resource." unless valid "https://sqs.#{region}.amazonaws.com/#{account_id}/#{resource}" end
initialize_fifo_attribute()
click to toggle source
# File lib/shoryuken/queue.rb, line 60 def initialize_fifo_attribute # calling fifo? will also initialize it fifo? end
queue_attributes()
click to toggle source
# File lib/shoryuken/queue.rb, line 109 def queue_attributes # Note: Retrieving all queue attributes as requesting `FifoQueue` on non-FIFO queue raises error. # See issue: https://github.com/aws/aws-sdk-ruby/issues/1350 client.get_queue_attributes(queue_url: url, attribute_names: ['All']) end
sanitize_message!(options)
click to toggle source
# File lib/shoryuken/queue.rb, line 138 def sanitize_message!(options) options = { message_body: options } if options.is_a?(String) if (body = options[:message_body]).is_a?(Hash) options[:message_body] = JSON.dump(body) end add_fifo_attributes!(options) options end
sanitize_messages!(options)
click to toggle source
# File lib/shoryuken/queue.rb, line 115 def sanitize_messages!(options) if options.is_a?(Array) entries = options.map.with_index do |m, index| { id: index.to_s }.merge(m.is_a?(Hash) ? m : { message_body: m }) end options = { entries: entries } end options[:entries].each(&method(:sanitize_message!)) options end
set_by_name(name)
click to toggle source
# File lib/shoryuken/queue.rb, line 65 def set_by_name(name) # rubocop:disable Naming/AccessorMethodName self.name = name self.url = client.get_queue_url(queue_name: name).queue_url end
set_by_url(url)
click to toggle source
# File lib/shoryuken/queue.rb, line 70 def set_by_url(url) # rubocop:disable Naming/AccessorMethodName self.name = url.split('/').last self.url = url end
set_name_and_url(name_or_url_or_arn)
click to toggle source
# File lib/shoryuken/queue.rb, line 86 def set_name_and_url(name_or_url_or_arn) # rubocop:disable Naming/AccessorMethodName if name_or_url_or_arn.include?('://') set_by_url(name_or_url_or_arn) # anticipate the fifo? checker for validating the queue URL initialize_fifo_attribute return end if name_or_url_or_arn.start_with?('arn:') url = arn_to_url(name_or_url_or_arn) set_by_url(url) # anticipate the fifo? checker for validating the queue URL initialize_fifo_attribute return end set_by_name(name_or_url_or_arn) rescue Aws::Errors::NoSuchEndpointError, Aws::SQS::Errors::NonExistentQueue => e raise e, "The specified queue #{name_or_url_or_arn} does not exist." end