class FakeServiceBus::Queue

Constants

LOCK_DURATION

Attributes

message_factory[R]
name[R]
queue_attributes[R]

Public Class Methods

new(options = {}) click to toggle source
# File lib/fake_servicebus/queue.rb, line 19
def initialize(options = {})
  @message_factory = options.fetch(:message_factory)

  @name = options.fetch(:name)
  @queue_attributes = default_attibutes.merge(options.fetch('Attributes'){ {} })
  @lock = Monitor.new
  reset
end

Public Instance Methods

add_queue_attributes(attrs) click to toggle source
# File lib/fake_servicebus/queue.rb, line 52
def add_queue_attributes(attrs)
  queue_attributes.merge!(attrs)
end
attributes() click to toggle source
# File lib/fake_servicebus/queue.rb, line 56
def attributes
  queue_attributes.merge(
    "MessageCount" => @messages.size + @messages_in_flight.size,
  )
end
check_message_for_dlq(message, options={}) click to toggle source
# File lib/fake_servicebus/queue.rb, line 137
def check_message_for_dlq(message, options={})
  if dlq_name = queue_attributes["ForwardDeadLetteredMessagesTo"]
    dlq = options[:queues].list.find{|queue| queue.name == dlq_name}
    if dlq && message.approximate_receive_count >= queue_attributes["MaxDeliveryCount"].to_i
      dlq.send_message(message: message)
      message.expire!
      true
    end
  end
end
default_attibutes() click to toggle source
# File lib/fake_servicebus/queue.rb, line 28
def default_attibutes
  {
    "LockDuration" => "PT1M",
    "MaxSizeInMegabytes" => 1024,
    "RequiresDuplicateDetection" => false,
    "RequiresSession" => false,
    "DefaultMessageTimeToLive" => "P10675199DT2H48M5.4775807S",
    "DeadLetteringOnMessageExpiration" => false,
    "DuplicateDetectionHistoryTimeWindow" => "PT10M",
    "MaxDeliveryCount" => 10,
    "EnableBatchedOperations" => true,
    "SizeInBytes" => 0,
    "MessageCount" => 0,
    "CreatedAt" => Time.now.utc.iso8601,
    "UpdatedAt" => Time.now.utc.iso8601,
  }
end
delete_message(lock_token) click to toggle source
# File lib/fake_servicebus/queue.rb, line 148
def delete_message(lock_token)
  with_lock do
    @messages.delete(lock_token)
    @messages_in_flight.delete(lock_token)
  end
end
expire() click to toggle source
# File lib/fake_servicebus/queue.rb, line 163
def expire
  with_lock do
    @messages.merge!(@messages_in_flight)
    @messages_in_flight.clear()
    reset_messages_in_flight
  end
end
lock_duration() click to toggle source
# File lib/fake_servicebus/queue.rb, line 92
def lock_duration
  if value = attributes['LockDuration']
    Duration.new(value).to_i
  else
    LOCK_DURATION
  end
end
messages() click to toggle source
# File lib/fake_servicebus/queue.rb, line 178
def messages
  @messages_view
end
messages_in_flight() click to toggle source
# File lib/fake_servicebus/queue.rb, line 182
def messages_in_flight
  @messages_in_flight_view
end
published_size() click to toggle source
# File lib/fake_servicebus/queue.rb, line 190
def published_size
  @messages.values.select { |m| m.published? }.size
end
receive_message(options = {}) click to toggle source
# File lib/fake_servicebus/queue.rb, line 72
def receive_message(options = {})
  return nil if @messages.empty?

  result = nil
  with_lock do
    published_messages = @messages.values.select { |m| m.published? }

    message = published_messages.delete_at(0)
    @messages.delete(message.lock_token)
    unless check_message_for_dlq(message, options)
      message.expire_at(lock_duration)
      message.receive!
      @messages_in_flight[message.lock_token] = message
      result = message
    end
  end

  result
end
renew_lock_message(lock_token) click to toggle source
# File lib/fake_servicebus/queue.rb, line 127
def renew_lock_message(lock_token)

  with_lock do
    message = @messages_in_flight[lock_token]
    raise MessageNotInflight unless message

    message.expire_at(default_visibility_timeout)
   end
 end
reset() click to toggle source
# File lib/fake_servicebus/queue.rb, line 155
def reset
  with_lock do
    @messages = {}
    @messages_view = FakeServiceBus::CollectionView.new(@messages)
    reset_messages_in_flight
  end
end
reset_messages_in_flight() click to toggle source
# File lib/fake_servicebus/queue.rb, line 171
def reset_messages_in_flight
  with_lock do
    @messages_in_flight = {}
    @messages_in_flight_view = FakeServiceBus::CollectionView.new(@messages_in_flight)
  end
end
send_message(options = {}) click to toggle source
# File lib/fake_servicebus/queue.rb, line 62
def send_message(options = {})
  with_lock do
    message = options.fetch(:message){ message_factory.new(options) }
    if message
      @messages[message.lock_token] = message
    end
    message
  end
end
size() click to toggle source
# File lib/fake_servicebus/queue.rb, line 186
def size
  @messages.size
end
timeout_messages!() click to toggle source
# File lib/fake_servicebus/queue.rb, line 100
def timeout_messages!
  with_lock do
    expired = @messages_in_flight.inject({}) do |memo,(lock_token,message)|
      if message.expired?
        memo[lock_token] = message
      end
      memo
    end
    expired.each do |lock_token,message|
      message.expire!
      @messages[lock_token] = message
      @messages_in_flight.delete(lock_token)
    end
  end
end
to_yaml() click to toggle source
# File lib/fake_servicebus/queue.rb, line 46
def to_yaml
  {
    "Attributes" => queue_attributes,
  }
end
unlock_message(lock_token) click to toggle source
# File lib/fake_servicebus/queue.rb, line 116
def unlock_message(lock_token)
  with_lock do
    message = @messages_in_flight[lock_token]
    raise MessageNotInflight unless message

    message.expire!
    @messages[lock_token] = message
    @messages_in_flight.delete(lock_token)
  end
end
with_lock() { || ... } click to toggle source
# File lib/fake_servicebus/queue.rb, line 194
def with_lock
  @lock.synchronize do
    yield
  end
end