class FakeSQS::Queue

Constants

VISIBILITY_TIMEOUT

Attributes

arn[R]
message_factory[R]
name[R]
queue_attributes[R]

Public Class Methods

new(options = {}) click to toggle source
# File lib/fake_sqs/queue.rb, line 18
def initialize(options = {})
  @message_factory = options.fetch(:message_factory)

  @name = options.fetch("QueueName")
  @arn = options.fetch("Arn") { "arn:aws:sqs:us-east-1:#{SecureRandom.hex}:#{@name}" }
  @queue_attributes = options.fetch("Attributes") { {} }
  @lock = Monitor.new
  reset
end

Public Instance Methods

add_queue_attributes(attrs) click to toggle source
# File lib/fake_sqs/queue.rb, line 36
def add_queue_attributes(attrs)
  queue_attributes.merge!(attrs)
end
attributes() click to toggle source
# File lib/fake_sqs/queue.rb, line 40
def attributes
  queue_attributes.merge(
    "QueueArn" => arn,
    "ApproximateNumberOfMessages" => published_size,
    "ApproximateNumberOfMessagesNotVisible" => @messages_in_flight.size,
  )
end
change_message_visibility(receipt, visibility) click to toggle source
# File lib/fake_sqs/queue.rb, line 111
def change_message_visibility(receipt, visibility)
  with_lock do
    message = @messages_in_flight[receipt]
    raise MessageNotInflight unless message

    if visibility == 0
      message.expire!
      @messages[receipt] = message
      @messages_in_flight.delete(receipt)
    else
      message.expire_at(visibility)
    end
  end
end
check_message_for_dlq(message, options={}) click to toggle source
# File lib/fake_sqs/queue.rb, line 126
def check_message_for_dlq(message, options={})
  if redrive_policy = queue_attributes["RedrivePolicy"] && JSON.parse(queue_attributes["RedrivePolicy"])
    dlq = options[:queues].list.find{|queue| queue.arn == redrive_policy["deadLetterTargetArn"]}
    if dlq && message.approximate_receive_count >= redrive_policy["maxReceiveCount"].to_i
      dlq.send_message(message: message)
      message.expire!
      true
    end
  end
end
default_visibility_timeout() click to toggle source
# File lib/fake_sqs/queue.rb, line 87
def default_visibility_timeout
  if value = attributes['VisibilityTimeout']
    value.to_i
  else
    VISIBILITY_TIMEOUT
  end
end
delete_message(receipt) click to toggle source
# File lib/fake_sqs/queue.rb, line 137
def delete_message(receipt)
  with_lock do
    @messages.delete(receipt)
    @messages_in_flight.delete(receipt)
  end
end
expire() click to toggle source
# File lib/fake_sqs/queue.rb, line 152
def expire
  with_lock do
    @messages.merge!(@messages_in_flight)
    @messages_in_flight.clear()
    reset_messages_in_flight
  end
end
messages() click to toggle source
# File lib/fake_sqs/queue.rb, line 167
def messages
  @messages_view
end
messages_in_flight() click to toggle source
# File lib/fake_sqs/queue.rb, line 171
def messages_in_flight
  @messages_in_flight_view
end
published_size() click to toggle source
# File lib/fake_sqs/queue.rb, line 179
def published_size
  @messages.values.select { |m| m.published? }.size
end
receive_message(options = {}) click to toggle source
# File lib/fake_sqs/queue.rb, line 58
def receive_message(options = {})
  amount = Integer options.fetch("MaxNumberOfMessages") { "1" }
  visibility_timeout = Integer options.fetch("VisibilityTimeout") { default_visibility_timeout }

  fail ReadCountOutOfRange, amount if amount > 10

  return {} if @messages.empty?

  result = {}

  with_lock do
    actual_amount = amount > published_size ? published_size : amount
    published_messages = @messages.values.select { |m| m.published? }

    actual_amount.times do
      message = published_messages.delete_at(rand(published_size))
      @messages.delete(message.receipt)
      unless check_message_for_dlq(message, options)
        message.expire_at(visibility_timeout)
        message.receive!
        @messages_in_flight[message.receipt] = message
        result[message.receipt] = message
      end
    end
  end

  result
end
reset() click to toggle source
# File lib/fake_sqs/queue.rb, line 144
def reset
  with_lock do
    @messages = {}
    @messages_view = FakeSQS::CollectionView.new(@messages)
    reset_messages_in_flight
  end
end
reset_messages_in_flight() click to toggle source
# File lib/fake_sqs/queue.rb, line 160
def reset_messages_in_flight
  with_lock do
    @messages_in_flight = {}
    @messages_in_flight_view = FakeSQS::CollectionView.new(@messages_in_flight)
  end
end
send_message(options = {}) click to toggle source
# File lib/fake_sqs/queue.rb, line 48
def send_message(options = {})
  with_lock do
    message = options.fetch(:message){ message_factory.new(options) }
    if message
      @messages[message.receipt] = message
    end
    message
  end
end
size() click to toggle source
# File lib/fake_sqs/queue.rb, line 175
def size
  @messages.size
end
timeout_messages!() click to toggle source
# File lib/fake_sqs/queue.rb, line 95
def timeout_messages!
  with_lock do
    expired = @messages_in_flight.inject({}) do |memo,(receipt,message)|
      if message.expired?
        memo[receipt] = message
      end
      memo
    end
    expired.each do |receipt,message|
      message.expire!
      @messages[receipt] = message
      @messages_in_flight.delete(receipt)
    end
  end
end
to_yaml() click to toggle source
# File lib/fake_sqs/queue.rb, line 28
def to_yaml
  {
    "QueueName" => name,
    "Arn" => arn,
    "Attributes" => queue_attributes,
  }
end
with_lock() { || ... } click to toggle source
# File lib/fake_sqs/queue.rb, line 183
def with_lock
  @lock.synchronize do
    yield
  end
end