class Tochtli::RabbitConnection

Constants

DEFAULT_CONNECTION_NAME

Attributes

connection[RW]
exchange_name[R]
logger[R]

Public Class Methods

close(name=nil) click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 43
def self.close(name=nil)
  name ||= defined?(Rails) ? Rails.env : nil
  raise ArgumentError, "RabbitMQ configuration name not specified" unless name
  connection = self.connections.delete(name.to_sym)
  connection.disconnect if connection && connection.open?
end
new(config = nil, channel_pool=nil) click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 16
def initialize(config = nil, channel_pool=nil)
  @config         = config.is_a?(RabbitConnection::Config) ? config : RabbitConnection::Config.load(nil, config)
  @exchange_name  = @config.delete(:exchange_name)
  @work_pool_size = @config.delete(:work_pool_size)
  @logger         = @config.delete(:logger) || Tochtli.logger
  @channel_pool   = channel_pool ? channel_pool : Hash.new
end
open(name=nil, config=nil) { |connection| ... } click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 24
def self.open(name=nil, config=nil)
  name ||= defined?(Rails) ? Rails.env : DEFAULT_CONNECTION_NAME
  raise ArgumentError, "RabbitMQ configuration name not specified" if !name && !ENV.has_key?('RABBITMQ_URL')
  connection = self.connections[name.to_sym]
  if !connection || !connection.open?
    config     = config.is_a?(RabbitConnection::Config) ? config : RabbitConnection::Config.load(name, config)
    connection = new(config)
    connection.connect
    self.connections[name.to_sym] = connection
  end

  if block_given?
    yield connection
    close name
  else
    connection
  end
end

Public Instance Methods

ack(delivery_tag) click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 117
def ack(delivery_tag)
  channel.ack(delivery_tag, false)
end
channel(thread=Thread.current) click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 101
def channel(thread=Thread.current)
  channel_wrap(thread).channel
end
connect(opts={}) { || ... } click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 50
def connect(opts={})
  return if open?

  defaults = {}
  unless opts[:logger]
    defaults[:logger]       = @logger.dup
    defaults[:logger].level = Tochtli.debug_bunny ? Logger::DEBUG : Logger::WARN
  end

  setup_bunny_connection(defaults.merge(opts))

  if block_given?
    yield
    disconnect if open?
  end
end
create_channel(consumer_pool_size = 1) click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 141
def create_channel(consumer_pool_size = 1)
  @connection.create_channel(nil, consumer_pool_size).tap do |channel|
    channel.confirm_select # use publisher confirmations
  end
end
create_exchange(channel) click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 147
def create_exchange(channel)
  channel.topic(@exchange_name, durable: true)
end
create_reply_queue() click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 89
def create_reply_queue
  Tochtli::ReplyQueue.new(self, @logger)
end
disconnect() click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 67
def disconnect
  @connection.close if @connection
rescue Bunny::ClientTimeout
  false
ensure
  @channel_pool.clear
  @connection  = nil
  @reply_queue = nil
end
exchange(thread=Thread.current) click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 97
def exchange(thread=Thread.current)
  channel_wrap(thread).exchange
end
open?() click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 77
def open?
  @connection && @connection.open?
end
publish(routing_key, message, options={}) click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 121
def publish(routing_key, message, options={})
  begin
    payload = message.to_json
  rescue Exception
    logger.error "Unable to serialize message: #{message.inspect}"
    logger.error $!
    raise "Unable to serialize message to JSON: #{$!}"
  end

  exchange.publish(payload, {
      routing_key:  routing_key,
      persistent:   true,
      mandatory:    true,
      timestamp:    Time.now.to_i,
      message_id:   message.id,
      type:         message.class.name.underscore,
      content_type: "application/json"
  }.merge(options))
end
queue(name, routing_keys=[], options={}) click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 105
def queue(name, routing_keys=[], options={})
  queue = channel.queue(name, {durable: true}.merge(options))
  routing_keys.each do |routing_key|
    queue.bind(exchange, routing_key: routing_key)
  end
  queue
end
queue_exists?(name) click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 113
def queue_exists?(name)
  @connection.queue_exists?(name)
end
reply_queue() click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 93
def reply_queue
  @reply_queue ||= create_reply_queue
end
setup_bunny_connection(opts={}) click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 81
def setup_bunny_connection(opts={})
  @connection = Bunny.new(@config, opts)
  @connection.start
rescue Bunny::TCPConnectionFailed => ex
  connection_url = "amqp://#{@connection.user}@#{@connection.host}:#{@connection.port}/#{@connection.vhost}"
  raise ConnectionFailed.new("Unable to connect to: '#{connection_url}' (#{ex.message})")
end

Private Instance Methods

channel_wrap(thread=Thread.current) click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 181
def channel_wrap(thread=Thread.current)
  channel_wrap = @channel_pool[thread.object_id]
  if channel_wrap && channel_wrap.channel.active
    channel_wrap
  else
    @channel_pool.delete(thread.object_id) # ensure inactive channel s not cached
    create_channel_wrap(thread)
  end
end
create_channel_wrap(thread=Thread.current) click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 165
def create_channel_wrap(thread=Thread.current)
  raise ConnectionFailed.new("Channel already created for thread #{thread.object_id}") if @channel_pool[thread.object_id]
  raise ConnectionFailed.new("Unable to create channel. Connection lost.") unless @connection

  channel  = create_channel(@work_pool_size)
  exchange = create_exchange(channel)
  exchange.on_return &method(:on_return)

  channel_wrap                    = ChannelWrap.new(channel, exchange)
  @channel_pool[thread.object_id] = channel_wrap

  channel_wrap
rescue Bunny::PreconditionFailed => ex
  raise ConnectionFailed.new("Unable create exchange: '#{@exchange_name}': #{ex.message}")
end
generate_id() click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 191
def generate_id
  SecureRandom.uuid
end
on_return(return_info, properties, payload) click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 153
def on_return(return_info, properties, payload)
  unless properties[:correlation_id]
    error_message = "Message #{properties[:message_id]} dropped: #{return_info[:reply_text]} [#{return_info[:reply_code]}]"
    reply_queue.handle_reply MessageDropped.new(error_message, payload), properties[:message_id]
  else # a reply dropped - client reply queue probably does not exist any more
    logger.debug "Reply on message #{properties[:correlation_id]} dropped: #{return_info[:reply_text]} [#{return_info[:reply_code]}]"
  end
rescue
  logger.error "Internal error (on_return): #{$!}"
  logger.error $!.backtrace.join("\n")
end