class RackRabbit::Adapter::AMQP
Attributes
channel[RW]
connection[RW]
Public Instance Methods
ack(delivery_tag)
click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 64 def ack(delivery_tag) channel.acknowledge(delivery_tag, false) end
connect()
click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 25 def connect return if connected? @connection = ::AMQP.connect(connection_options) @channel = ::AMQP::Channel.new(connection) channel.prefetch(1) end
connected?()
click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 37 def connected? !@connection.nil? end
disconnect()
click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 32 def disconnect channel.close unless channel.nil? connection.close unless connection.nil? end
publish(payload, properties)
click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 52 def publish(payload, properties) exchange = get_exchange(properties.delete(:exchange), properties.delete(:exchange_type)) exchange ||= channel.default_exchange exchange.publish(payload || "", properties) end
reject(delivery_tag)
click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 68 def reject(delivery_tag) channel.reject(delivery_tag, false) end
shutdown()
click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 17 def shutdown shutdown_eventmachine end
started?()
click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 21 def started? !@thread.nil? end
startup()
click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 13 def startup startup_eventmachine end
subscribe(options = {}) { |message| ... }
click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 41 def subscribe(options = {}, &block) queue = get_queue(options.delete(:queue)) || channel.queue("", :exclusive => true) exchange = get_exchange(options.delete(:exchange), options.delete(:exchange_type)) if exchange queue.bind(exchange, :routing_key => options.delete(:routing_key)) end queue.subscribe(options) do |properties, payload| yield Message.new(properties.delivery_tag, properties, payload, self) end end
with_reply_queue() { |reply_queue| ... }
click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 58 def with_reply_queue channel.queue("", :exclusive => true, :auto_delete => true) do |reply_queue, declare_ok| yield reply_queue end end
Private Instance Methods
get_exchange(ex = :default, type = :direct)
click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 92 def get_exchange(ex = :default, type = :direct) case ex when ::AMQP::Exchange then ex when Symbol, String then channel.send(type || :direct, ex) unless ex.to_s.downcase.to_sym == :default else nil end end
get_queue(q)
click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 101 def get_queue(q) case q when ::AMQP::Queue then q when Symbol, String then channel.queue(q) else nil end end
shutdown_eventmachine()
click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 86 def shutdown_eventmachine sleep(1) # warmdown EventMachine.stop @thread = nil end
startup_eventmachine()
click to toggle source