class Requeus::Adapter::Rabbit
Public Class Methods
new(opts)
click to toggle source
# File lib/requeus/adapter/rabbit.rb, line 8 def initialize opts @opts = opts end
Public Instance Methods
confirm(queue, handle)
click to toggle source
# File lib/requeus/adapter/rabbit.rb, line 29 def confirm queue, handle cq(queue).delivery_tag = handle cmd cq(queue), :ack true end
get(queue, limit = 1)
click to toggle source
# File lib/requeus/adapter/rabbit.rb, line 17 def get queue, limit = 1 result = [] requests = 0 while requests <= limit && r = pop_request(cq(queue)) result << r requests += 1 end result end
put(queue, request)
click to toggle source
# File lib/requeus/adapter/rabbit.rb, line 12 def put queue, request cmd cq(queue), :publish, request, :persistent => true true end
Private Instance Methods
client()
click to toggle source
# File lib/requeus/adapter/rabbit.rb, line 66 def client Thread.current[:requeus_rabbit_client] ||= Carrot.new( :host => @opts['host'], :port => @opts['port'].to_i, :user => @opts['user'], :pass => @opts['pass'], :vhost => @opts['vhost'] ) end
cmd(queue, command, *args)
click to toggle source
# File lib/requeus/adapter/rabbit.rb, line 46 def cmd queue, command, *args retried = false begin queue.send(command, *args) rescue Carrot::AMQP::Server::ServerDown => e puts 'Reconnecting...' unless retried drop_client retried = true retry else raise e end end end
cq(name)
click to toggle source
# File lib/requeus/adapter/rabbit.rb, line 37 def cq name client.queue(name, :durable => true) end
drop_client()
click to toggle source
# File lib/requeus/adapter/rabbit.rb, line 62 def drop_client Thread.current[:requeus_rabbit_client] = nil end
pop_request(queue)
click to toggle source
# File lib/requeus/adapter/rabbit.rb, line 41 def pop_request queue request = cmd queue, :pop [queue.delivery_tag, request] if request end