class DispatchRider::QueueServices::AwsSqs

Public Instance Methods

assign_storage(attrs) click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 12
def assign_storage(attrs)
  begin
    sqs = AWS::SQS.new(logger: nil, region: attrs[:region])
    if attrs[:name]
      sqs.queues.named(attrs[:name])
    elsif attrs[:url]
      sqs.queues[attrs[:url]]
    else
      raise RecordInvalid.new(self, ["Either name or url have to be specified"])
    end
  rescue NameError
    raise AdapterNotFoundError.new(self.class.name, 'aws-sdk')
  end
end
construct_message_from(item) click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 47
def construct_message_from(item)
  deserialize(MessageBodyExtractor.new(item).extract)
end
delete(item) click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 51
def delete(item)
  item.delete
end
insert(item) click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 43
def insert(item)
  queue.send_message(item)
end
pop() { |obj| ... } click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 27
def pop
  raw_item = queue.receive_message
  if raw_item.present?
    obj = SqsReceivedMessage.new(construct_message_from(raw_item), raw_item, queue)

    visibility_timeout_shield(obj) do
      raise AbortExecution, "false received from handler" unless yield(obj)
      obj
    end

    Retriable.retriable(tries: 3) { raw_item.delete }
  end
rescue AbortExecution
  # ignore, it was already handled, just need to break out if pop
end
size() click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 55
def size
  queue.approximate_number_of_messages
end

Private Instance Methods

visibility_timeout_shield(message) { || ... } click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 61
def visibility_timeout_shield(message)
  begin
    yield
  ensure
    duration = Time.now - message.start_time
    timeout = message.total_timeout
    raise VisibilityTimeoutExceeded, "message: #{message.subject}, #{message.body.inspect} took #{duration} seconds while the timeout was #{timeout}" if duration > timeout
  end
end