class Requeus::Queue
Attributes
endpoint[R]
name[R]
Public Class Methods
new(conf)
click to toggle source
# File lib/requeus/queue.rb, line 3 def initialize conf @name = conf['name'] @endpoint = conf['endpoint'] @workers_count = conf['workers'].to_i @interval = conf['interval'].to_f end
Public Instance Methods
start_workers()
click to toggle source
# File lib/requeus/queue.rb, line 13 def start_workers workers_queue = SizedQueue.new(@workers_count) @workers_count.times.map do Thread.new do loop do begin server, queue, handle, request = workers_queue.pop if request.do_request queue.endpoint server.confirm queue.name, handle request.delete_files end rescue Exception => e puts e puts e.backtrace.join("\n") end end end end Requeus::Impl.instance.server_sequence.map do |s| Thread.new(s) do |server| loop do begin requests = server.get @name, workers_queue.num_waiting if requests.empty? sleep server.interval * @interval else requests.each {|handle, request| workers_queue << [server, self, handle, Requeus::Request.from_json(request)]} end rescue Exception => e puts e puts e.backtrace.join("\n") end end end end end