class BunnyMock::Queue

Attributes

channel[R]

@return {BunnyMock::Channel} Channel used by queue

name[R]

@return [String] Queue name

opts[R]

@return [Hash] Creation options

Public Class Methods

new(channel, name = '', opts = {}) click to toggle source

Create a new [BunnyMock::Queue] instance

@param [BunnyMock::Channel] channel Channel this queue will use @param [String] name Name of queue @param [Hash] opts Creation options

@see BunnyMock::Channel#queue

# File lib/bunny_mock/queue.rb, line 26
def initialize(channel, name = '', opts = {})
  # Store creation information
  @channel = channel
  @name    = name
  @opts    = opts

  # Store messages
  @messages = []
end

Public Instance Methods

all() click to toggle source

Get all messages in queue

@return [Array] All messages @api public

# File lib/bunny_mock/queue.rb, line 221
def all
  @messages
end
bind(exchange, opts = {}) click to toggle source

Bind this queue to an exchange

@param [BunnyMock::Exchange,String] exchange Exchange to bind to @param [Hash] opts Binding properties

@option opts [String] :routing_key Custom routing key

@api public

# File lib/bunny_mock/queue.rb, line 114
def bind(exchange, opts = {})
  check_queue_deleted!

  if exchange.respond_to?(:add_route)

    # we can do the binding ourselves
    exchange.add_route opts.fetch(:routing_key, @name), self
  else

    # we need the channel to lookup the exchange
    @channel.queue_bind self, opts.fetch(:routing_key, @name), exchange
  end
  self
end
bound_to?(exchange, opts = {}) click to toggle source

Check if this queue is bound to the exchange

@param [BunnyMock::Exchange,String] exchange Exchange to test @param [Hash] opts Binding properties

@option opts [String] :routing_key Routing key from binding

@return [Boolean] true if this queue is bound to the given exchange, false otherwise @api public

# File lib/bunny_mock/queue.rb, line 166
def bound_to?(exchange, opts = {})
  check_queue_deleted!

  if exchange.respond_to?(:routes_to?)
    # we can do the check ourselves
    exchange.routes_to? self, opts
  else
    # we need the channel to lookup the exchange
    @channel.xchg_routes_to? self, opts.fetch(:routing_key, @name), exchange
  end
end
delete() click to toggle source

Deletes this queue

@api public

# File lib/bunny_mock/queue.rb, line 230
def delete
  @channel.deregister_queue self
  @deleted = true
end
get(opts = { manual_ack: false }, &block)
Alias for: pop
message_count() click to toggle source

Count of messages in queue

@return [Integer] Number of messages in queue @api public

# File lib/bunny_mock/queue.rb, line 184
def message_count
  @messages.count
end
pop(opts = { manual_ack: false }, &block) click to toggle source

Get oldest message in queue

@return [Hash] Message data @api public

# File lib/bunny_mock/queue.rb, line 194
def pop(opts = { manual_ack: false }, &block)
  if BunnyMock.use_bunny_queue_pop_api
    bunny_pop(opts, &block)
  else
    warn '[DEPRECATED] This behavior is deprecated - please set `BunnyMock::use_bunny_queue_pop_api` to true to use Bunny Queue#pop behavior'
    @messages.shift
  end
end
Also aliased as: get
publish(payload, opts = {}) click to toggle source

Publish a message

@param [Object] payload Message payload @param [Hash] opts Message properties

@option opts [String] :routing_key Routing key @option opts [Boolean] :persistent Should the message be persisted to disk? @option opts [Boolean] :mandatory Should the message be returned if it cannot be routed to any queue? @option opts [Integer] :timestamp A timestamp associated with this message @option opts [Integer] :expiration Expiration time after which the message will be deleted @option opts [String] :type Message type, e.g. what type of event or command this message represents. Can be any string @option opts [String] :reply_to Queue name other apps should send the response to @option opts [String] :content_type Message content type (e.g. application/json) @option opts [String] :content_encoding Message content encoding (e.g. gzip) @option opts [String] :correlation_id Message correlated to this one, e.g. what request this message is a reply for @option opts [Integer] :priority Message priority, 0 to 9. Not used by RabbitMQ, only applications @option opts [String] :message_id Any message identifier @option opts [String] :user_id Optional user ID. Verified by RabbitMQ against the actual connection username @option opts [String] :app_id Optional application ID

@return [BunnyMock::Queue] self @see {BunnyMock::Exchange#publish} @api public

# File lib/bunny_mock/queue.rb, line 63
def publish(payload, opts = {})
  check_queue_deleted!

  # add to messages
  @messages << { message: payload, options: opts }
  yield_consumers
  self
end
purge() click to toggle source

Clear all messages in queue

@api public

# File lib/bunny_mock/queue.rb, line 209
def purge
  @messages = []

  self
end
subscribe(*args, &block) click to toggle source

Adds a consumer to the queue (subscribes for message deliveries).

Params are so they can be used when the message is processed. Takes a block which is called when a message is delivered to the queue

@api public

# File lib/bunny_mock/queue.rb, line 80
def subscribe(*args, &block)
  @consumers ||= []
  @consumers << [block, args]
  yield_consumers

  self
end
subscribe_with(consumer, *args) click to toggle source

Adds a specific consumer object to the queue (subscribes for message deliveries).

@param [#call] consumer A subclass of Bunny::Consumer or any callable object Secondary params are so they can be used when the message is processed.

@api public

# File lib/bunny_mock/queue.rb, line 96
def subscribe_with(consumer, *args)
  @consumers ||= []
  @consumers << [consumer, args]
  yield_consumers

  self
end
unbind(exchange, opts = {}) click to toggle source

Unbind this queue from an exchange

@param [BunnyMock::Exchange,String] exchange Exchange to unbind from @param [Hash] opts Binding properties

@option opts [String] :routing_key Custom routing key

@api public

# File lib/bunny_mock/queue.rb, line 139
def unbind(exchange, opts = {})
  check_queue_deleted!

  if exchange.respond_to?(:remove_route)

    # we can do the unbinding ourselves
    exchange.remove_route opts.fetch(:routing_key, @name), self
  else

    # we need the channel to lookup the exchange
    @channel.queue_unbind self, opts.fetch(:routing_key, @name), exchange
  end
end

Private Instance Methods

bunny_pop(*) { |*response| ... } click to toggle source

@private

# File lib/bunny_mock/queue.rb, line 243
def bunny_pop(*)
  response = pop_response(@messages.shift)
  block_given? ? yield(*response) : response
end
check_queue_deleted!() click to toggle source

@private

# File lib/bunny_mock/queue.rb, line 238
def check_queue_deleted!
  raise 'Queue has been deleted' if @deleted
end
pop_response(message) click to toggle source

@private

# File lib/bunny_mock/queue.rb, line 249
def pop_response(message)
  return [nil, nil, nil] unless message

  di = GetResponse.new(@channel, self, message[:options])
  mp = MessageProperties.new(message[:options])

  [di, mp, message[:message]]
end
store_acknowledgement(response, args) click to toggle source
# File lib/bunny_mock/queue.rb, line 271
def store_acknowledgement(response, args)
  if args[0].is_a?(Hash) && args[0][:manual_ack]
    delivery_tag = response[0][:delivery_tag]
    @channel.acknowledged_state[:pending][delivery_tag] = response
  end
end
yield_consumers() click to toggle source

@private

# File lib/bunny_mock/queue.rb, line 259
def yield_consumers
  return if @consumers.nil?
  @consumers.each do |c, args|
    # rubocop:disable AssignmentInCondition
    while message = all.pop
      response = pop_response(message)
      store_acknowledgement(response, args)
      c.call(response)
    end
  end
end