class Requeus::Adapter::Rabbit

Public Class Methods

new(opts) click to toggle source
# File lib/requeus/adapter/rabbit.rb, line 8
def initialize opts
  @opts = opts
end

Public Instance Methods

confirm(queue, handle) click to toggle source
# File lib/requeus/adapter/rabbit.rb, line 29
def confirm queue, handle
  cq(queue).delivery_tag = handle
  cmd cq(queue), :ack
  true
end
get(queue, limit = 1) click to toggle source
# File lib/requeus/adapter/rabbit.rb, line 17
def get queue, limit = 1
  result = []
  requests = 0

  while requests <= limit && r = pop_request(cq(queue))
    result << r
    requests += 1
  end

  result
end
put(queue, request) click to toggle source
# File lib/requeus/adapter/rabbit.rb, line 12
def put queue, request
  cmd cq(queue), :publish, request, :persistent => true
  true
end

Private Instance Methods

client() click to toggle source
# File lib/requeus/adapter/rabbit.rb, line 66
def client
  Thread.current[:requeus_rabbit_client] ||= Carrot.new(
      :host => @opts['host'],
      :port => @opts['port'].to_i,
      :user => @opts['user'],
      :pass => @opts['pass'],
      :vhost => @opts['vhost']
  )
end
cmd(queue, command, *args) click to toggle source
# File lib/requeus/adapter/rabbit.rb, line 46
def cmd queue, command, *args
  retried = false
  begin
    queue.send(command, *args)
  rescue Carrot::AMQP::Server::ServerDown => e
    puts 'Reconnecting...'
    unless retried
      drop_client
      retried = true
      retry
    else
      raise e
    end
  end
end
cq(name) click to toggle source
# File lib/requeus/adapter/rabbit.rb, line 37
def cq name
  client.queue(name, :durable => true)
end
drop_client() click to toggle source
# File lib/requeus/adapter/rabbit.rb, line 62
def drop_client
  Thread.current[:requeus_rabbit_client] = nil
end
pop_request(queue) click to toggle source
# File lib/requeus/adapter/rabbit.rb, line 41
def pop_request queue
  request = cmd queue, :pop
  [queue.delivery_tag, request] if request
end