class Bunny::Exchange

Represents AMQP 0.9.1 exchanges.

@see rubybunny.info/articles/exchanges.html Exchanges and Publishing guide @see rubybunny.info/articles/extensions.html RabbitMQ Extensions guide

Attributes

channel[R]

@return [Bunny::Channel]

name[R]

@return [String]

opts[RW]

Options hash this exchange instance was instantiated with @return [Hash]

status[R]

@return [Symbol] @api plugin

type[R]

Type of this exchange (one of: :direct, :fanout, :topic, :headers). @return [Symbol]

Public Class Methods

default(channel_or_connection) click to toggle source

The default exchange. This exchange is a direct exchange that is predefined by the broker and that cannot be removed. Every queue is bound to this exchange by default with the following routing semantics: messages will be routed to the queue with the same name as the message's routing key. In other words, if a message is published with a routing key of “weather.usa.ca.sandiego” and there is a queue with this name, the message will be routed to the queue.

@param [Bunny::Channel] channel_or_connection Channel to use. {Bunny::Session} instances

are only supported for backwards compatibility.

@example Publishing a messages to the tasks queue

channel     = Bunny::Channel.new(connection)
tasks_queue = channel.queue("tasks")
Bunny::Exchange.default(channel).publish("make clean", :routing_key => "tasks")

@see rubybunny.info/articles/exchanges.html Exchanges and Publishing guide @see www.rabbitmq.com/resources/specs/amqp0-9-1.pdf AMQP 0.9.1 specification (Section 2.1.2.4) @note Do not confuse the default exchange with amq.direct: amq.direct is a pre-defined direct

exchange that doesn't have any special routing semantics.

@return [Exchange] An instance that corresponds to the default exchange (of type direct). @api public

# File lib/bunny/exchange.rb, line 54
def self.default(channel_or_connection)
  self.new(channel_or_connection, :direct, AMQ::Protocol::EMPTY_STRING, :no_declare => true)
end
new(channel, type, name, opts = {}) click to toggle source

@param [Bunny::Channel] channel Channel this exchange will use. @param [Symbol,String] type Exchange type @param [String] name Exchange name @param [Hash] opts Exchange properties

@option opts [Boolean] :durable (false) Should this exchange be durable? @option opts [Boolean] :auto_delete (false) Should this exchange be automatically deleted when it is no longer used? @option opts [Boolean] :arguments ({}) Additional optional arguments (typically used by RabbitMQ extensions and plugins)

@see Bunny::Channel#topic @see Bunny::Channel#fanout @see Bunny::Channel#direct @see Bunny::Channel#headers @see rubybunny.info/articles/exchanges.html Exchanges and Publishing guide @see rubybunny.info/articles/extensions.html RabbitMQ Extensions guide @api public

# File lib/bunny/exchange.rb, line 74
def initialize(channel, type, name, opts = {})
  @channel          = channel
  @name             = name
  @type             = type
  @options          = self.class.add_default_options(name, opts)

  @durable          = @options[:durable]
  @auto_delete      = @options[:auto_delete]
  @internal         = @options[:internal]
  @arguments        = @options[:arguments]

  @bindings         = Set.new

  declare! unless opts[:no_declare] || predeclared? || (@name == AMQ::Protocol::EMPTY_STRING)

  @channel.register_exchange(self)
end

Protected Class Methods

add_default_options(name, opts) click to toggle source

@private

# File lib/bunny/exchange.rb, line 258
def self.add_default_options(name, opts)
  # :nowait is always false for Bunny
  h = { :queue => name, :nowait => false }.merge(opts)

  if name.empty?
    {
      :passive     => false,
      :durable     => false,
      :auto_delete => false,
      :internal    => false,
      :arguments   => nil
    }.merge(h)
  else
    h
  end
end

Public Instance Methods

arguments() click to toggle source

@return [Hash] Additional optional arguments (typically used by RabbitMQ extensions and plugins) @api public

# File lib/bunny/exchange.rb, line 112
def arguments
  @arguments
end
auto_delete?() click to toggle source

@return [Boolean] true if this exchange was declared as automatically deleted (deleted as soon as last consumer unbinds). @api public

# File lib/bunny/exchange.rb, line 100
def auto_delete?
  @auto_delete
end
bind(source, opts = {}) click to toggle source

Binds an exchange to another (source) exchange using exchange.bind AMQP 0.9.1 extension that RabbitMQ provides.

@param [String] source Source exchange name @param [Hash] opts Options

@option opts [String] routing_key (nil) Routing key used for binding @option opts [Hash] arguments ({}) Optional arguments

@return [Bunny::Exchange] Self @see rubybunny.info/articles/exchanges.html Exchanges and Publishing guide @see rubybunny.info/articles/bindings.html Bindings guide @see rubybunny.info/articles/extensions.html RabbitMQ Extensions guide @api public

# File lib/bunny/exchange.rb, line 174
def bind(source, opts = {})
  @channel.exchange_bind(source, self, opts)
  @bindings.add(source: source, opts: opts)

  self
end
delete(opts = {}) click to toggle source

Deletes the exchange unless it is predeclared

@param [Hash] opts Options

@option opts [Boolean] if_unused (false) Should this exchange be deleted only if it is no longer used

@see rubybunny.info/articles/exchanges.html Exchanges and Publishing guide @api public

# File lib/bunny/exchange.rb, line 155
def delete(opts = {})
  @channel.deregister_exchange(self)
  @channel.exchange_delete(@name, opts) unless predeclared?
end
durable?() click to toggle source

@return [Boolean] true if this exchange was declared as durable (will survive broker restart). @api public

# File lib/bunny/exchange.rb, line 94
def durable?
  @durable
end
handle_return(basic_return, properties, content) click to toggle source

@private

# File lib/bunny/exchange.rb, line 236
def handle_return(basic_return, properties, content)
  if @on_return
    @on_return.call(basic_return, properties, content)
  else
    # TODO: log a warning
  end
end
internal?() click to toggle source

@return [Boolean] true if this exchange is internal (used solely for exchange-to-exchange

bindings and cannot be published to by clients)
# File lib/bunny/exchange.rb, line 106
def internal?
  @internal
end
on_return(&block) click to toggle source

Defines a block that will handle returned messages @see rubybunny.info/articles/exchanges.html @api public

# File lib/bunny/exchange.rb, line 205
def on_return(&block)
  @on_return = block

  self
end
predeclared?()
Alias for: predefined?
predefined?() click to toggle source

@return [Boolean] true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on)

# File lib/bunny/exchange.rb, line 245
def predefined?
  (@name == AMQ::Protocol::EMPTY_STRING) || !!(@name =~ /^amq\.(direct|fanout|topic|headers|match)/i)
end
Also aliased as: predeclared?
publish(payload, opts = {}) click to toggle source

Publishes a message

@param [String] payload Message payload. It will never be modified by Bunny or RabbitMQ in any way. @param [Hash] opts Message properties (metadata) and delivery settings

@option opts [String] :routing_key Routing key @option opts [Boolean] :persistent Should the message be persisted to disk? @option opts [Boolean] :mandatory Should the message be returned if it cannot be routed to any queue? @option opts [Integer] :timestamp A timestamp associated with this message @option opts [Integer] :expiration Expiration time after which the message will be deleted @option opts [String] :type Message type, e.g. what type of event or command this message represents. Can be any string @option opts [String] :reply_to Queue name other apps should send the response to @option opts [String] :content_type Message content type (e.g. application/json) @option opts [String] :content_encoding Message content encoding (e.g. gzip) @option opts [String] :correlation_id Message correlated to this one, e.g. what request this message is a reply for @option opts [Integer] :priority Message priority, 0 to 9. Not used by RabbitMQ, only applications @option opts [String] :message_id Any message identifier @option opts [String] :user_id Optional user ID. Verified by RabbitMQ against the actual connection username @option opts [String] :app_id Optional application ID

@return [Bunny::Exchange] Self @see rubybunny.info/articles/exchanges.html Exchanges and Publishing guide @api public

# File lib/bunny/exchange.rb, line 140
def publish(payload, opts = {})
  @channel.basic_publish(payload, self.name, (opts.delete(:routing_key) || opts.delete(:key)), opts)

  self
end
recover_from_network_failure() click to toggle source

@private

# File lib/bunny/exchange.rb, line 222
def recover_from_network_failure
  declare! unless @options[:no_declare] ||predefined?

  @bindings.each do |b|
    bind(b[:source], b[:opts])
  end
end
unbind(source, opts = {}) click to toggle source

Unbinds an exchange from another (source) exchange using exchange.unbind AMQP 0.9.1 extension that RabbitMQ provides.

@param [String] source Source exchange name @param [Hash] opts Options

@option opts [String] routing_key (nil) Routing key used for binding @option opts [Hash] arguments ({}) Optional arguments

@return [Bunny::Exchange] Self @see rubybunny.info/articles/exchanges.html Exchanges and Publishing guide @see rubybunny.info/articles/bindings.html Bindings guide @see rubybunny.info/articles/extensions.html RabbitMQ Extensions guide @api public

# File lib/bunny/exchange.rb, line 195
def unbind(source, opts = {})
  @channel.exchange_unbind(source, self, opts)
  @bindings.delete(source: source, opts: opts)

  self
end
wait_for_confirms() click to toggle source

Waits until all outstanding publisher confirms on the channel arrive.

This is a convenience method that delegates to {Bunny::Channel#wait_for_confirms}

@api public

# File lib/bunny/exchange.rb, line 217
def wait_for_confirms
  @channel.wait_for_confirms
end

Protected Instance Methods

declare!() click to toggle source

@private

# File lib/bunny/exchange.rb, line 253
def declare!
  @channel.exchange_declare(@name, @type, @options)
end