class FakeServiceBus::Queue
Constants
- LOCK_DURATION
Attributes
message_factory[R]
name[R]
queue_attributes[R]
Public Class Methods
new(options = {})
click to toggle source
# File lib/fake_servicebus/queue.rb, line 19 def initialize(options = {}) @message_factory = options.fetch(:message_factory) @name = options.fetch(:name) @queue_attributes = default_attibutes.merge(options.fetch('Attributes'){ {} }) @lock = Monitor.new reset end
Public Instance Methods
add_queue_attributes(attrs)
click to toggle source
# File lib/fake_servicebus/queue.rb, line 52 def add_queue_attributes(attrs) queue_attributes.merge!(attrs) end
attributes()
click to toggle source
# File lib/fake_servicebus/queue.rb, line 56 def attributes queue_attributes.merge( "MessageCount" => @messages.size + @messages_in_flight.size, ) end
check_message_for_dlq(message, options={})
click to toggle source
# File lib/fake_servicebus/queue.rb, line 137 def check_message_for_dlq(message, options={}) if dlq_name = queue_attributes["ForwardDeadLetteredMessagesTo"] dlq = options[:queues].list.find{|queue| queue.name == dlq_name} if dlq && message.approximate_receive_count >= queue_attributes["MaxDeliveryCount"].to_i dlq.send_message(message: message) message.expire! true end end end
default_attibutes()
click to toggle source
# File lib/fake_servicebus/queue.rb, line 28 def default_attibutes { "LockDuration" => "PT1M", "MaxSizeInMegabytes" => 1024, "RequiresDuplicateDetection" => false, "RequiresSession" => false, "DefaultMessageTimeToLive" => "P10675199DT2H48M5.4775807S", "DeadLetteringOnMessageExpiration" => false, "DuplicateDetectionHistoryTimeWindow" => "PT10M", "MaxDeliveryCount" => 10, "EnableBatchedOperations" => true, "SizeInBytes" => 0, "MessageCount" => 0, "CreatedAt" => Time.now.utc.iso8601, "UpdatedAt" => Time.now.utc.iso8601, } end
delete_message(lock_token)
click to toggle source
# File lib/fake_servicebus/queue.rb, line 148 def delete_message(lock_token) with_lock do @messages.delete(lock_token) @messages_in_flight.delete(lock_token) end end
expire()
click to toggle source
# File lib/fake_servicebus/queue.rb, line 163 def expire with_lock do @messages.merge!(@messages_in_flight) @messages_in_flight.clear() reset_messages_in_flight end end
lock_duration()
click to toggle source
# File lib/fake_servicebus/queue.rb, line 92 def lock_duration if value = attributes['LockDuration'] Duration.new(value).to_i else LOCK_DURATION end end
messages()
click to toggle source
# File lib/fake_servicebus/queue.rb, line 178 def messages @messages_view end
messages_in_flight()
click to toggle source
# File lib/fake_servicebus/queue.rb, line 182 def messages_in_flight @messages_in_flight_view end
published_size()
click to toggle source
# File lib/fake_servicebus/queue.rb, line 190 def published_size @messages.values.select { |m| m.published? }.size end
receive_message(options = {})
click to toggle source
# File lib/fake_servicebus/queue.rb, line 72 def receive_message(options = {}) return nil if @messages.empty? result = nil with_lock do published_messages = @messages.values.select { |m| m.published? } message = published_messages.delete_at(0) @messages.delete(message.lock_token) unless check_message_for_dlq(message, options) message.expire_at(lock_duration) message.receive! @messages_in_flight[message.lock_token] = message result = message end end result end
renew_lock_message(lock_token)
click to toggle source
# File lib/fake_servicebus/queue.rb, line 127 def renew_lock_message(lock_token) with_lock do message = @messages_in_flight[lock_token] raise MessageNotInflight unless message message.expire_at(default_visibility_timeout) end end
reset()
click to toggle source
# File lib/fake_servicebus/queue.rb, line 155 def reset with_lock do @messages = {} @messages_view = FakeServiceBus::CollectionView.new(@messages) reset_messages_in_flight end end
reset_messages_in_flight()
click to toggle source
# File lib/fake_servicebus/queue.rb, line 171 def reset_messages_in_flight with_lock do @messages_in_flight = {} @messages_in_flight_view = FakeServiceBus::CollectionView.new(@messages_in_flight) end end
send_message(options = {})
click to toggle source
# File lib/fake_servicebus/queue.rb, line 62 def send_message(options = {}) with_lock do message = options.fetch(:message){ message_factory.new(options) } if message @messages[message.lock_token] = message end message end end
size()
click to toggle source
# File lib/fake_servicebus/queue.rb, line 186 def size @messages.size end
timeout_messages!()
click to toggle source
# File lib/fake_servicebus/queue.rb, line 100 def timeout_messages! with_lock do expired = @messages_in_flight.inject({}) do |memo,(lock_token,message)| if message.expired? memo[lock_token] = message end memo end expired.each do |lock_token,message| message.expire! @messages[lock_token] = message @messages_in_flight.delete(lock_token) end end end
to_yaml()
click to toggle source
# File lib/fake_servicebus/queue.rb, line 46 def to_yaml { "Attributes" => queue_attributes, } end
unlock_message(lock_token)
click to toggle source
# File lib/fake_servicebus/queue.rb, line 116 def unlock_message(lock_token) with_lock do message = @messages_in_flight[lock_token] raise MessageNotInflight unless message message.expire! @messages[lock_token] = message @messages_in_flight.delete(lock_token) end end
with_lock() { || ... }
click to toggle source
# File lib/fake_servicebus/queue.rb, line 194 def with_lock @lock.synchronize do yield end end