class FakeSQS::Queue
Constants
- VISIBILITY_TIMEOUT
Attributes
arn[R]
message_factory[R]
name[R]
queue_attributes[R]
Public Class Methods
new(options = {})
click to toggle source
# File lib/fake_sqs/queue.rb, line 18 def initialize(options = {}) @message_factory = options.fetch(:message_factory) @name = options.fetch("QueueName") @arn = options.fetch("Arn") { "arn:aws:sqs:us-east-1:#{SecureRandom.hex}:#{@name}" } @queue_attributes = options.fetch("Attributes") { {} } @lock = Monitor.new reset end
Public Instance Methods
add_queue_attributes(attrs)
click to toggle source
# File lib/fake_sqs/queue.rb, line 36 def add_queue_attributes(attrs) queue_attributes.merge!(attrs) end
attributes()
click to toggle source
# File lib/fake_sqs/queue.rb, line 40 def attributes queue_attributes.merge( "QueueArn" => arn, "ApproximateNumberOfMessages" => published_size, "ApproximateNumberOfMessagesNotVisible" => @messages_in_flight.size, ) end
change_message_visibility(receipt, visibility)
click to toggle source
# File lib/fake_sqs/queue.rb, line 111 def change_message_visibility(receipt, visibility) with_lock do message = @messages_in_flight[receipt] raise MessageNotInflight unless message if visibility == 0 message.expire! @messages[receipt] = message @messages_in_flight.delete(receipt) else message.expire_at(visibility) end end end
check_message_for_dlq(message, options={})
click to toggle source
# File lib/fake_sqs/queue.rb, line 126 def check_message_for_dlq(message, options={}) if redrive_policy = queue_attributes["RedrivePolicy"] && JSON.parse(queue_attributes["RedrivePolicy"]) dlq = options[:queues].list.find{|queue| queue.arn == redrive_policy["deadLetterTargetArn"]} if dlq && message.approximate_receive_count >= redrive_policy["maxReceiveCount"].to_i dlq.send_message(message: message) message.expire! true end end end
default_visibility_timeout()
click to toggle source
# File lib/fake_sqs/queue.rb, line 87 def default_visibility_timeout if value = attributes['VisibilityTimeout'] value.to_i else VISIBILITY_TIMEOUT end end
delete_message(receipt)
click to toggle source
# File lib/fake_sqs/queue.rb, line 137 def delete_message(receipt) with_lock do @messages.delete(receipt) @messages_in_flight.delete(receipt) end end
expire()
click to toggle source
# File lib/fake_sqs/queue.rb, line 152 def expire with_lock do @messages.merge!(@messages_in_flight) @messages_in_flight.clear() reset_messages_in_flight end end
messages()
click to toggle source
# File lib/fake_sqs/queue.rb, line 167 def messages @messages_view end
messages_in_flight()
click to toggle source
# File lib/fake_sqs/queue.rb, line 171 def messages_in_flight @messages_in_flight_view end
published_size()
click to toggle source
# File lib/fake_sqs/queue.rb, line 179 def published_size @messages.values.select { |m| m.published? }.size end
receive_message(options = {})
click to toggle source
# File lib/fake_sqs/queue.rb, line 58 def receive_message(options = {}) amount = Integer options.fetch("MaxNumberOfMessages") { "1" } visibility_timeout = Integer options.fetch("VisibilityTimeout") { default_visibility_timeout } fail ReadCountOutOfRange, amount if amount > 10 return {} if @messages.empty? result = {} with_lock do actual_amount = amount > published_size ? published_size : amount published_messages = @messages.values.select { |m| m.published? } actual_amount.times do message = published_messages.delete_at(rand(published_size)) @messages.delete(message.receipt) unless check_message_for_dlq(message, options) message.expire_at(visibility_timeout) message.receive! @messages_in_flight[message.receipt] = message result[message.receipt] = message end end end result end
reset()
click to toggle source
# File lib/fake_sqs/queue.rb, line 144 def reset with_lock do @messages = {} @messages_view = FakeSQS::CollectionView.new(@messages) reset_messages_in_flight end end
reset_messages_in_flight()
click to toggle source
# File lib/fake_sqs/queue.rb, line 160 def reset_messages_in_flight with_lock do @messages_in_flight = {} @messages_in_flight_view = FakeSQS::CollectionView.new(@messages_in_flight) end end
send_message(options = {})
click to toggle source
# File lib/fake_sqs/queue.rb, line 48 def send_message(options = {}) with_lock do message = options.fetch(:message){ message_factory.new(options) } if message @messages[message.receipt] = message end message end end
size()
click to toggle source
# File lib/fake_sqs/queue.rb, line 175 def size @messages.size end
timeout_messages!()
click to toggle source
# File lib/fake_sqs/queue.rb, line 95 def timeout_messages! with_lock do expired = @messages_in_flight.inject({}) do |memo,(receipt,message)| if message.expired? memo[receipt] = message end memo end expired.each do |receipt,message| message.expire! @messages[receipt] = message @messages_in_flight.delete(receipt) end end end
to_yaml()
click to toggle source
# File lib/fake_sqs/queue.rb, line 28 def to_yaml { "QueueName" => name, "Arn" => arn, "Attributes" => queue_attributes, } end
with_lock() { || ... }
click to toggle source
# File lib/fake_sqs/queue.rb, line 183 def with_lock @lock.synchronize do yield end end