class Toro::Manager

Attributes

busy[R]
ready[R]

Public Class Methods

new(options={}) click to toggle source
# File lib/toro/manager.rb, line 10
def initialize(options={})
  defaults = {
    concurrency: 1,
    queues: [Toro.options[:default_queue]],
  }
  options = defaults.merge(options)
  @queues = options[:queues]
  @threads = {}
  @ready = options[:concurrency].times.map do
    processor = Processor.new_link(current_actor)
    processor.proxy_id = processor.object_id
    processor
  end
  @busy = []
  @is_done = false
  @fetcher = Fetcher.new({ manager: current_actor, queues: options[:queues] })
  @listener = Listener.new({ queues: @queues, fetcher: @fetcher, manager: current_actor })
end

Public Instance Methods

assign(job) click to toggle source
# File lib/toro/manager.rb, line 52
def assign(job)
  raise 'No processors ready' if !is_ready?
  processor = @ready.pop
  @busy << processor
  processor.async.process(job)
end
clean_up_for_graceful_shutdown() click to toggle source
# File lib/toro/manager.rb, line 69
def clean_up_for_graceful_shutdown
  if @busy.empty?
    shutdown
    return true
  end

  after(Toro.options[:graceful_shutdown_time]) { clean_up_for_graceful_shutdown }
  false
end
dispatch() click to toggle source
# File lib/toro/manager.rb, line 63
def dispatch
  raise "No processors, cannot continue!" if @ready.empty? && @busy.empty?
  raise "No ready processor!?" if @ready.empty?
  @fetcher.async.fetch
end
hard_shutdown_in(delay) click to toggle source
# File lib/toro/manager.rb, line 79
def hard_shutdown_in(delay)
  Toro.logger.info "Pausing up to #{delay} seconds to allow workers to finish..."

  after(delay) do
    # We've reached the timeout and we still have busy processors.
    # They must die but their messages shall live on.
    Toro.logger.warn "Terminating #{@busy.size} busy worker threads"

    requeue

    @busy.each do |processor|
      if processor.alive? && thread = @threads.delete(processor.object_id)
        thread.raise Shutdown
      end
    end

    signal_shutdown
  end
end
heartbeat() click to toggle source
# File lib/toro/manager.rb, line 119
def heartbeat
  return if stopped?

  after(5) do
    heartbeat
  end
end
is_ready?() click to toggle source
# File lib/toro/manager.rb, line 59
def is_ready?
  !@ready.empty?
end
processor_complete(processor) click to toggle source
# File lib/toro/manager.rb, line 127
def processor_complete(processor)
  @threads.delete(processor.object_id)
  @busy.delete(processor)
  if stopped?
    processor.terminate if processor.alive?
    shutdown if @busy.empty?
  else
    @ready << processor if processor.alive?
    dispatch
  end
end
requeue() click to toggle source
# File lib/toro/manager.rb, line 104
def requeue
  Toro::Database.with_connection do
    Job.where(status: 'running', started_by: Toro.process_identity).
      update_all(status: 'queued', started_by: nil, started_at: nil)
  end
end
set_thread(proxy_id, thread) click to toggle source
# File lib/toro/manager.rb, line 115
def set_thread(proxy_id, thread)
  @threads[proxy_id] = thread
end
shutdown() click to toggle source
# File lib/toro/manager.rb, line 99
def shutdown
  requeue
  signal_shutdown
end
signal_shutdown() click to toggle source
# File lib/toro/manager.rb, line 111
def signal_shutdown
  after(0) { signal(:shutdown) }
end
start() click to toggle source
# File lib/toro/manager.rb, line 29
def start
  @is_done = false
  @listener.async.start
  @ready.each { dispatch }
  heartbeat
end
stop() click to toggle source
# File lib/toro/manager.rb, line 36
def stop
  @is_done = true

  Toro.logger.debug "Shutting down #{@ready.size} quiet workers"
  @ready.each { |processor| processor.terminate if processor.alive? }
  @ready.clear
  @fetcher.terminate if @fetcher.alive?
  if @listener.alive?
    actors[:listener].stop if actors[:listener]
    @listener.terminate
  end
  return if clean_up_for_graceful_shutdown

  hard_shutdown_in(Toro.options[:hard_shutdown_time])
end
stopped?() click to toggle source
# File lib/toro/manager.rb, line 139
def stopped?
  @is_done
end