module Messaging::Client

Provides methods and constants required to establish an AMQP connection and channel with failure handling and recovery. @see www.rabbitmq.com/amqp-0-9-1-reference.html#constants

For a list of error codes that will cause an exception to be raised
rather than invoking automatic recovery.

Public Instance Methods

declare_exchange(channel, name, type, options = {}) click to toggle source

Declare an exchange on the specified channel.

@param channel [AMQP::Channel] @param name [String] @param type [String] @param options [Hash] @return [AMQP::Exchange] @api public

# File lib/messaging/client.rb, line 91
def declare_exchange(channel, name, type, options = {})
  exchange =
    # Check if default options need to be supplied to a non-default delcaration
    if default_exchange?(name)
      channel.default_exchange
    else
      channel.send(type, name, options)
    end

  log.debug("Exchange #{exchange.name.inspect} declared")

  exchange
end
declare_queue(channel, exchange, name, key, options = {}) click to toggle source

Declare and bind a queue to the specified exchange via the supplied routing key.

@param channel [AMQP::Channel] @param exchange [AMQP::Exchange] @param name [String] @param key [String] @param options [Hash] @return [AMQP::Queue] @api public

# File lib/messaging/client.rb, line 115
def declare_queue(channel, exchange, name, key, options = {})
  channel.queue(name, options) do |queue|
    # Check if additional bindings are needed
    unless default_exchange?(exchange.name)
      queue.bind(exchange, { :routing_key => key })
    end

    log.debug("Queue #{queue.name.inspect} bound to #{exchange.name.inspect}")
  end
end
disconnect() click to toggle source

Close all channels and then disconnect all the connections.

@return [] @api public

# File lib/messaging/client.rb, line 130
def disconnect
  channels.each do |chan|
    chan.close
  end

  connections.each do |conn|
    conn.disconnect
  end
end
open_channel(connection, prefetch = nil) click to toggle source

Open an AMQP::Channel with auto-recovery and error handling.

@param connection [AMQP::Connection] @param prefetch [Integer, nil] @return [AMQP::Channel] @api public

# File lib/messaging/client.rb, line 62
def open_channel(connection, prefetch = nil)
  res = AMQP::Channel.new(connection) do |channel, open_ok|
    channel.auto_recovery = true
    channel.prefetch(prefetch) if prefetch

    channel.on_error do |ch, error|
      log.error("Channel error #{error.reply_text.inspect}, recovering")

      # Raise erroneous channel calls/conditions
      # rather than endlessly retrying
      if (403..406).include?(error.reply_code)
        raise(MessagingError, "Channel exception: #{error.reply_text.inspect}")
      end
    end

    log.debug("Channel #{channel.id} created")
  end

  register_channel(res)
end
open_connection(uri, delay = nil) click to toggle source

Create an AMQP::Connection with auto-reconnect and error handling.

@param uri [String] The AMQP URI to connect to. @param delay [Integer, nil] Time to delay between reconnection attempts. @return [AMQP::Connection] @api public

# File lib/messaging/client.rb, line 25
def open_connection(uri, delay = nil)
  delay ||= config.reconnect_delay

  options = AMQP::Client.parse_connection_uri(uri)
  options.merge!(:heartbeat => config.heartbeat)

  res = AMQP.connect(options) do |connection, open_ok|
    # Handle TCP connection errors
    connection.on_tcp_connection_loss do |conn, settings|
      log.error("Connection to #{uri.inspect} lost, reconnecting")

      conn.periodically_reconnect(delay)
    end

    # Handle general errors
    connection.on_error do |conn, error|
      log.error("Connection to #{uri.inspect} lost, reconnecting")

      if (402..540).include?(error.reply_code)
        raise(MessagingError, "Connection exception: #{error.reply_text.inspect}")
      end

      conn.periodically_reconnect(delay)
    end

    log.debug("Connection to #{uri.inspect} started")
  end

  register_connection(res)
end

Protected Instance Methods

config() click to toggle source

@return [Messaging::Configuration] @api protected

# File lib/messaging/client.rb, line 150
def config
  Configuration.instance
end
log() click to toggle source

@return [#info, debug, error] @api protected

# File lib/messaging/client.rb, line 144
def log
  config.logger
end

Private Instance Methods

channels() click to toggle source

@return [Array<AMQP::Channels>] @api private

# File lib/messaging/client.rb, line 158
def channels
  @channels ||= []
end
connections() click to toggle source

@return [Array<AMQP::Connection>] @api private

# File lib/messaging/client.rb, line 172
def connections
  @connections ||= []
end
default_exchange?(name) click to toggle source

@param name [String] @return [Boolean] @api private

# File lib/messaging/client.rb, line 187
def default_exchange?(name)
  ["",
   "amq.default",
   "amq.direct",
   "amq.fanout",
   "amq.topic",
   "amq.headers",
   "amq.match"].include?(name)
end
register_channel(channel) click to toggle source

@param channel [AMQP::Channel] @return [AMQP::Channel] @api private

# File lib/messaging/client.rb, line 165
def register_channel(channel)
  channels << channel
  channel
end
register_connection(connection) click to toggle source

@param connection [AMQP::Connection] @return [AMQP::Connection] @api private

# File lib/messaging/client.rb, line 179
def register_connection(connection)
  connections << connection
  connection
end