class Tochtli::RabbitConnection
Constants
- DEFAULT_CONNECTION_NAME
Attributes
connection[RW]
exchange_name[R]
logger[R]
Public Class Methods
close(name=nil)
click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 43 def self.close(name=nil) name ||= defined?(Rails) ? Rails.env : nil raise ArgumentError, "RabbitMQ configuration name not specified" unless name connection = self.connections.delete(name.to_sym) connection.disconnect if connection && connection.open? end
new(config = nil, channel_pool=nil)
click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 16 def initialize(config = nil, channel_pool=nil) @config = config.is_a?(RabbitConnection::Config) ? config : RabbitConnection::Config.load(nil, config) @exchange_name = @config.delete(:exchange_name) @work_pool_size = @config.delete(:work_pool_size) @logger = @config.delete(:logger) || Tochtli.logger @channel_pool = channel_pool ? channel_pool : Hash.new end
open(name=nil, config=nil) { |connection| ... }
click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 24 def self.open(name=nil, config=nil) name ||= defined?(Rails) ? Rails.env : DEFAULT_CONNECTION_NAME raise ArgumentError, "RabbitMQ configuration name not specified" if !name && !ENV.has_key?('RABBITMQ_URL') connection = self.connections[name.to_sym] if !connection || !connection.open? config = config.is_a?(RabbitConnection::Config) ? config : RabbitConnection::Config.load(name, config) connection = new(config) connection.connect self.connections[name.to_sym] = connection end if block_given? yield connection close name else connection end end
Public Instance Methods
ack(delivery_tag)
click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 117 def ack(delivery_tag) channel.ack(delivery_tag, false) end
channel(thread=Thread.current)
click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 101 def channel(thread=Thread.current) channel_wrap(thread).channel end
connect(opts={}) { || ... }
click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 50 def connect(opts={}) return if open? defaults = {} unless opts[:logger] defaults[:logger] = @logger.dup defaults[:logger].level = Tochtli.debug_bunny ? Logger::DEBUG : Logger::WARN end setup_bunny_connection(defaults.merge(opts)) if block_given? yield disconnect if open? end end
create_channel(consumer_pool_size = 1)
click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 141 def create_channel(consumer_pool_size = 1) @connection.create_channel(nil, consumer_pool_size).tap do |channel| channel.confirm_select # use publisher confirmations end end
create_exchange(channel)
click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 147 def create_exchange(channel) channel.topic(@exchange_name, durable: true) end
create_reply_queue()
click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 89 def create_reply_queue Tochtli::ReplyQueue.new(self, @logger) end
disconnect()
click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 67 def disconnect @connection.close if @connection rescue Bunny::ClientTimeout false ensure @channel_pool.clear @connection = nil @reply_queue = nil end
exchange(thread=Thread.current)
click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 97 def exchange(thread=Thread.current) channel_wrap(thread).exchange end
open?()
click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 77 def open? @connection && @connection.open? end
publish(routing_key, message, options={})
click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 121 def publish(routing_key, message, options={}) begin payload = message.to_json rescue Exception logger.error "Unable to serialize message: #{message.inspect}" logger.error $! raise "Unable to serialize message to JSON: #{$!}" end exchange.publish(payload, { routing_key: routing_key, persistent: true, mandatory: true, timestamp: Time.now.to_i, message_id: message.id, type: message.class.name.underscore, content_type: "application/json" }.merge(options)) end
queue(name, routing_keys=[], options={})
click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 105 def queue(name, routing_keys=[], options={}) queue = channel.queue(name, {durable: true}.merge(options)) routing_keys.each do |routing_key| queue.bind(exchange, routing_key: routing_key) end queue end
queue_exists?(name)
click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 113 def queue_exists?(name) @connection.queue_exists?(name) end
reply_queue()
click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 93 def reply_queue @reply_queue ||= create_reply_queue end
setup_bunny_connection(opts={})
click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 81 def setup_bunny_connection(opts={}) @connection = Bunny.new(@config, opts) @connection.start rescue Bunny::TCPConnectionFailed => ex connection_url = "amqp://#{@connection.user}@#{@connection.host}:#{@connection.port}/#{@connection.vhost}" raise ConnectionFailed.new("Unable to connect to: '#{connection_url}' (#{ex.message})") end
Private Instance Methods
channel_wrap(thread=Thread.current)
click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 181 def channel_wrap(thread=Thread.current) channel_wrap = @channel_pool[thread.object_id] if channel_wrap && channel_wrap.channel.active channel_wrap else @channel_pool.delete(thread.object_id) # ensure inactive channel s not cached create_channel_wrap(thread) end end
create_channel_wrap(thread=Thread.current)
click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 165 def create_channel_wrap(thread=Thread.current) raise ConnectionFailed.new("Channel already created for thread #{thread.object_id}") if @channel_pool[thread.object_id] raise ConnectionFailed.new("Unable to create channel. Connection lost.") unless @connection channel = create_channel(@work_pool_size) exchange = create_exchange(channel) exchange.on_return &method(:on_return) channel_wrap = ChannelWrap.new(channel, exchange) @channel_pool[thread.object_id] = channel_wrap channel_wrap rescue Bunny::PreconditionFailed => ex raise ConnectionFailed.new("Unable create exchange: '#{@exchange_name}': #{ex.message}") end
generate_id()
click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 191 def generate_id SecureRandom.uuid end
on_return(return_info, properties, payload)
click to toggle source
# File lib/tochtli/rabbit_connection.rb, line 153 def on_return(return_info, properties, payload) unless properties[:correlation_id] error_message = "Message #{properties[:message_id]} dropped: #{return_info[:reply_text]} [#{return_info[:reply_code]}]" reply_queue.handle_reply MessageDropped.new(error_message, payload), properties[:message_id] else # a reply dropped - client reply queue probably does not exist any more logger.debug "Reply on message #{properties[:correlation_id]} dropped: #{return_info[:reply_text]} [#{return_info[:reply_code]}]" end rescue logger.error "Internal error (on_return): #{$!}" logger.error $!.backtrace.join("\n") end