class DelayedJobCelluloid::Manager
Attributes
busy[R]
ready[R]
Public Class Methods
new(options={}, worker_count)
click to toggle source
# File lib/delayed_job_celluloid/manager.rb, line 13 def initialize(options={}, worker_count) @options = options @worker_count = worker_count || 1 @done_callback = nil @in_progress = {} @threads = {} @done = false @busy = [] @ready = @worker_count.times.map do w = Worker.new_link(options, current_actor) w.proxy_id = w.object_id w end end
Public Instance Methods
hard_shutdown_in(delay)
click to toggle source
# File lib/delayed_job_celluloid/manager.rb, line 86 def hard_shutdown_in(delay) info "Pausing up to #{delay} seconds to allow workers to finish..." after(delay) do # We've reached the timeout and we still have busy workers. # They must die but their messages shall live on. info "Still waiting for #{@busy.size} busy workers" debug "Terminating #{@busy.size} busy worker threads" @busy.each do |worker| if worker.alive? && t = @threads.delete(worker.object_id) t.raise Shutdown end end after(0) { signal(:shutdown) } end end
real_thread(proxy_id, thr)
click to toggle source
# File lib/delayed_job_celluloid/manager.rb, line 82 def real_thread(proxy_id, thr) @threads[proxy_id] = thr end
start()
click to toggle source
# File lib/delayed_job_celluloid/manager.rb, line 29 def start @ready.each_with_index do |worker, index| worker.name = "delayed_job.#{index}" DelayedJobCelluloid.logger.info "Starting worker #{worker.name}" worker.async.start end end
stop(timeout)
click to toggle source
# File lib/delayed_job_celluloid/manager.rb, line 37 def stop(timeout) @done = true info "Shutting down #{@ready.size} idle workers" @ready.each do |worker| worker.terminate if worker.alive? end @ready.clear return after(0) { signal(:shutdown) } if @busy.empty? hard_shutdown_in timeout end
stopped?()
click to toggle source
# File lib/delayed_job_celluloid/manager.rb, line 78 def stopped? @done end
work(worker)
click to toggle source
# File lib/delayed_job_celluloid/manager.rb, line 50 def work(worker) @ready.delete(worker) @busy << worker end
worker_died(worker, reason)
click to toggle source
# File lib/delayed_job_celluloid/manager.rb, line 65 def worker_died(worker, reason) debug "#{worker.inspect} died because of #{reason}" unless reason.nil? @busy.delete(worker) unless stopped? worker = Worker.new_link(@options, current_actor) worker.name = "restarted" @ready << worker worker.async.start else signal(:shutdown) if @busy.empty? end end
worker_done(worker)
click to toggle source
# File lib/delayed_job_celluloid/manager.rb, line 55 def worker_done(worker) @busy.delete(worker) if stopped? worker.terminate if worker.alive? signal(:shutdown) if @busy.empty? else @ready << worker if worker.alive? end end