class RackRabbit::Adapter::AMQP

Attributes

channel[RW]
connection[RW]

Public Instance Methods

ack(delivery_tag) click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 64
def ack(delivery_tag)
  channel.acknowledge(delivery_tag, false)
end
connect() click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 25
def connect
  return if connected?
  @connection = ::AMQP.connect(connection_options)
  @channel = ::AMQP::Channel.new(connection)
  channel.prefetch(1)
end
connected?() click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 37
def connected?
  !@connection.nil?
end
disconnect() click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 32
def disconnect
  channel.close unless channel.nil?
  connection.close unless connection.nil?
end
publish(payload, properties) click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 52
def publish(payload, properties)
  exchange = get_exchange(properties.delete(:exchange), properties.delete(:exchange_type))
  exchange ||= channel.default_exchange
  exchange.publish(payload || "", properties)
end
reject(delivery_tag) click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 68
def reject(delivery_tag)
  channel.reject(delivery_tag, false)
end
shutdown() click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 17
def shutdown
  shutdown_eventmachine
end
started?() click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 21
def started?
  !@thread.nil?
end
startup() click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 13
def startup
  startup_eventmachine
end
subscribe(options = {}) { |message| ... } click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 41
def subscribe(options = {}, &block)
  queue = get_queue(options.delete(:queue)) || channel.queue("", :exclusive => true)
  exchange = get_exchange(options.delete(:exchange), options.delete(:exchange_type))
  if exchange
    queue.bind(exchange, :routing_key => options.delete(:routing_key))
  end
  queue.subscribe(options) do |properties, payload|
    yield Message.new(properties.delivery_tag, properties, payload, self)
  end
end
with_reply_queue() { |reply_queue| ... } click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 58
def with_reply_queue
  channel.queue("", :exclusive => true, :auto_delete => true) do |reply_queue, declare_ok|
    yield reply_queue
  end
end

Private Instance Methods

get_exchange(ex = :default, type = :direct) click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 92
def get_exchange(ex = :default, type = :direct)
  case ex
  when ::AMQP::Exchange then ex
  when Symbol, String    then channel.send(type || :direct, ex) unless ex.to_s.downcase.to_sym == :default
  else
    nil
  end
end
get_queue(q) click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 101
def get_queue(q)
  case q
  when ::AMQP::Queue then q
  when Symbol, String then channel.queue(q)
  else
    nil
  end
end
shutdown_eventmachine() click to toggle source
# File lib/rack-rabbit/adapter/amqp.rb, line 86
def shutdown_eventmachine
  sleep(1) # warmdown
  EventMachine.stop
  @thread = nil
end
startup_eventmachine() click to toggle source
PRIVATE IMPLEMENTATION
# File lib/rack-rabbit/adapter/amqp.rb, line 78
def startup_eventmachine
  raise RuntimeError, "already started" if started?
  ready = false
  @thread = Thread.new { EventMachine.run { ready = true } }
  sleep(1) until ready
  sleep(1) # warmup
end