class DelayedJobCelluloid::Manager

Attributes

busy[R]
ready[R]

Public Class Methods

new(options={}, worker_count) click to toggle source
# File lib/delayed_job_celluloid/manager.rb, line 13
def initialize(options={}, worker_count)
  @options = options
  @worker_count = worker_count || 1
  @done_callback = nil

  @in_progress = {}
  @threads = {}
  @done = false
  @busy = []
  @ready = @worker_count.times.map do
    w = Worker.new_link(options, current_actor)
    w.proxy_id = w.object_id
    w
  end
end

Public Instance Methods

hard_shutdown_in(delay) click to toggle source
# File lib/delayed_job_celluloid/manager.rb, line 86
def hard_shutdown_in(delay)
  info "Pausing up to #{delay} seconds to allow workers to finish..." 

  after(delay) do
    # We've reached the timeout and we still have busy workers.
    # They must die but their messages shall live on.
    info "Still waiting for #{@busy.size} busy workers"

    debug "Terminating #{@busy.size} busy worker threads"
    @busy.each do |worker|
      if worker.alive? && t = @threads.delete(worker.object_id)
        t.raise Shutdown
      end
    end

    after(0) { signal(:shutdown) }
  end
end
real_thread(proxy_id, thr) click to toggle source
# File lib/delayed_job_celluloid/manager.rb, line 82
def real_thread(proxy_id, thr)
  @threads[proxy_id] = thr
end
start() click to toggle source
# File lib/delayed_job_celluloid/manager.rb, line 29
def start
  @ready.each_with_index do |worker, index|
    worker.name = "delayed_job.#{index}"
    DelayedJobCelluloid.logger.info "Starting worker #{worker.name}"
    worker.async.start 
  end
end
stop(timeout) click to toggle source
# File lib/delayed_job_celluloid/manager.rb, line 37
def stop(timeout)   
  @done = true
        
  info "Shutting down #{@ready.size} idle workers"
  @ready.each do |worker|
    worker.terminate if worker.alive?
  end
  @ready.clear
  
  return after(0) { signal(:shutdown) } if @busy.empty?
  hard_shutdown_in timeout
end
stopped?() click to toggle source
# File lib/delayed_job_celluloid/manager.rb, line 78
def stopped?
  @done
end
work(worker) click to toggle source
# File lib/delayed_job_celluloid/manager.rb, line 50
def work(worker)
  @ready.delete(worker)
  @busy << worker
end
worker_died(worker, reason) click to toggle source
# File lib/delayed_job_celluloid/manager.rb, line 65
def worker_died(worker, reason)
  debug "#{worker.inspect} died because of #{reason}" unless reason.nil?
  @busy.delete(worker)
  unless stopped?
    worker = Worker.new_link(@options, current_actor)
    worker.name = "restarted"
    @ready << worker
    worker.async.start
  else
    signal(:shutdown) if @busy.empty?
  end
end
worker_done(worker) click to toggle source
# File lib/delayed_job_celluloid/manager.rb, line 55
def worker_done(worker)
  @busy.delete(worker)
  if stopped?
    worker.terminate if worker.alive?
    signal(:shutdown) if @busy.empty?
  else
    @ready << worker if worker.alive?
  end
end