class DispatchRider::QueueServices::AwsSqs

Attributes

visibility_timeout[R]

Public Instance Methods

assign_storage(attrs) click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 16
def assign_storage(attrs)
  sqs = Aws::SQS::Client.new(logger: nil)
  if attrs[:name].present?
    url = sqs.list_queues({queue_name_prefix: attrs[:name]}).queue_urls.first
    set_visibility_timeout(sqs,url)
    Aws::SQS::Queue.new(url: url, client: sqs)
  elsif attrs[:url].present?
    set_visibility_timeout(sqs,attrs[:url])
    Aws::SQS::Queue.new(url: attrs[:url], client: sqs)
  else
    raise RecordInvalid.new(self, ["Either name or url have to be specified"])
  end
end
construct_message_from(item) click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 51
def construct_message_from(item)
  deserialize(MessageBodyExtractor.new(item).extract)
end
delete(item) click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 55
def delete(item)
  item.delete
end
insert(item) click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 47
def insert(item)
  queue.send_message(item)
end
pop() { |obj| ... } click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 30
def pop
  raw_item = queue.receive_messages({max_number_of_messages: 1}).first
  if raw_item.present?
    obj = SqsReceivedMessage.new(construct_message_from(raw_item), raw_item, queue, visibility_timeout)

    visibility_timeout_shield(obj) do
      raise AbortExecution, "false received from handler" unless yield(obj)

      obj
    end

    Retriable.retriable(tries: 3) { raw_item.delete }
  end
rescue AbortExecution
  # ignore, it was already handled, just need to break out if pop
end
size() click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 59
def size
  queue.approximate_number_of_messages
end

Private Instance Methods

set_visibility_timeout(client,url) click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 67
def set_visibility_timeout(client,url)
  resp = client.get_queue_attributes(queue_url: url, attribute_names: ["VisibilityTimeout"])
  @visibility_timeout = resp.attributes["VisibilityTimeout"]
end
visibility_timeout_shield(message) { || ... } click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 72
def visibility_timeout_shield(message)
  yield
ensure
  duration = Time.now - message.start_time
  timeout = message.total_timeout
  if duration > timeout
    message = "message: #{message.subject}, #{message.body.inspect} took #{duration} seconds while the timeout was #{timeout}"
    raise VisibilityTimeoutExceeded, message
  end
end