class Roundhouse::Manager

The main router in the system. This manages the processor state and accepts messages from Redis to be dispatched to an idle processor.

Constants

JVM_RESERVED_SIGNALS
PROCTITLES
SPIN_TIME_FOR_GRACEFUL_SHUTDOWN

Attributes

busy[R]
fetcher[RW]
ready[R]

Public Class Methods

new(condvar, options={}) click to toggle source
# File lib/roundhouse/manager.rb, line 26
def initialize(condvar, options={})
  logger.debug { options.inspect }
  @options = options
  @count = options[:concurrency] || 25
  raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1
  @done_callback = nil
  @finished = condvar

  @in_progress = {}
  @threads = {}
  @done = false
  @busy = []
  @ready = @count.times.map do
    p = Processor.new_link(current_actor)
    p.proxy_id = p.object_id
    p
  end
end

Public Instance Methods

assign(work) click to toggle source
# File lib/roundhouse/manager.rb, line 113
def assign(work)
  watchdog("Manager#assign died") do
    if stopped?
      # Race condition between Manager#stop if Fetcher
      # is blocked on redis and gets a message after
      # all the ready Processors have been stopped.
      # Push the message back to redis.
      work.requeue
    else
      processor = @ready.pop
      @in_progress[processor.object_id] = work
      @busy << processor
      processor.async.process(work)
    end
  end
end
clean_up_for_graceful_shutdown() click to toggle source
# File lib/roundhouse/manager.rb, line 62
def clean_up_for_graceful_shutdown
  if @busy.empty?
    shutdown
    return true
  end

  after(SPIN_TIME_FOR_GRACEFUL_SHUTDOWN) { clean_up_for_graceful_shutdown }
  false
end
heartbeat(key, data, json) click to toggle source
# File lib/roundhouse/manager.rb, line 146
def heartbeat(key, data, json)
  results = PROCTITLES.map {|x| x.(self, data) }
  results.compact!
  $0 = results.join(' ')

  (key, json)
  after(5) do
    heartbeat(key, data, json)
  end
end
processor_died(processor, reason) click to toggle source
# File lib/roundhouse/manager.rb, line 96
def processor_died(processor, reason)
  watchdog("Manager#processor_died died") do
    @in_progress.delete(processor.object_id)
    @threads.delete(processor.object_id)
    @busy.delete(processor)

    unless stopped?
      p = Processor.new_link(current_actor)
      p.proxy_id = p.object_id
      @ready << p
      dispatch
    else
      shutdown if @busy.empty?
    end
  end
end
processor_done(processor) click to toggle source
# File lib/roundhouse/manager.rb, line 80
def processor_done(processor)
  watchdog('Manager#processor_done died') do
    @done_callback.call(processor) if @done_callback
    @in_progress.delete(processor.object_id)
    @threads.delete(processor.object_id)
    @busy.delete(processor)
    if stopped?
      processor.terminate if processor.alive?
      shutdown if @busy.empty?
    else
      @ready << processor if processor.alive?
    end
    dispatch
  end
end
real_thread(proxy_id, thr) click to toggle source

A hack worthy of Rube Goldberg. We need to be able to hard stop a working thread. But there’s no way for us to get handle to the underlying thread performing work for a processor so we have it call us and tell us.

# File lib/roundhouse/manager.rb, line 134
def real_thread(proxy_id, thr)
  @threads[proxy_id] = thr if thr.alive?
end
start() click to toggle source
# File lib/roundhouse/manager.rb, line 72
def start
  @ready.each { dispatch }
end
stop(options={}) click to toggle source
# File lib/roundhouse/manager.rb, line 45
def stop(options={})
  watchdog('Manager#stop died') do
    should_shutdown = options[:shutdown]
    timeout = options[:timeout]

    @done = true

    logger.info { "Terminating #{@ready.size} quiet workers" }
    @ready.each { |x| x.terminate if x.alive? }
    @ready.clear

    return if clean_up_for_graceful_shutdown

    hard_shutdown_in timeout if should_shutdown
  end
end
stopped?() click to toggle source
# File lib/roundhouse/manager.rb, line 157
def stopped?
  @done
end
when_done(&blk) click to toggle source
# File lib/roundhouse/manager.rb, line 76
def when_done(&blk)
  @done_callback = blk
end

Private Instance Methods

dispatch() click to toggle source
# File lib/roundhouse/manager.rb, line 210
def dispatch
  return if stopped?
  # This is a safety check to ensure we haven't leaked
  # processors somehow.
  raise "BUG: No processors, cannot continue!" if @ready.empty? && @busy.empty?
  raise "No ready processor!?" if @ready.empty?

  @fetcher.async.fetch
end
hard_shutdown_in(delay) click to toggle source
# File lib/roundhouse/manager.rb, line 187
def hard_shutdown_in(delay)
  logger.info { "Pausing up to #{delay} seconds to allow workers to finish..." }

  after(delay) do
    watchdog("Manager#hard_shutdown_in died") do
      # We've reached the timeout and we still have busy workers.
      # They must die but their messages shall live on.
      logger.warn { "Terminating #{@busy.size} busy worker threads" }
      logger.warn { "Work still in progress #{@in_progress.values.inspect}" }

      requeue

      @busy.each do |processor|
        if processor.alive? && t = @threads.delete(processor.object_id)
          t.raise Shutdown
        end
      end

      @finished.signal
    end
  end
end
requeue() click to toggle source
# File lib/roundhouse/manager.rb, line 225
def requeue
  # Re-enqueue terminated jobs
  # NOTE: You may notice that we may push a job back to redis before
  # the worker thread is terminated. This is ok because Roundhouse's
  # contract says that jobs are run AT LEAST once. Process termination
  # is delayed until we're certain the jobs are back in Redis because
  # it is worse to lose a job than to run it twice.
  Roundhouse::Fetcher.strategy.bulk_requeue(@in_progress.values, @options)
  @in_progress.clear
end
shutdown() click to toggle source
# File lib/roundhouse/manager.rb, line 220
def shutdown
  requeue
  @finished.signal
end
(key, json) click to toggle source
# File lib/roundhouse/manager.rb, line 163
def (key, json)
  begin
    _, _, _, msg = Roundhouse.redis do |conn|
      conn.multi do
        conn.sadd('processes', key)
        conn.hmset(key, 'info', json, 'busy', @busy.size, 'beat', Time.now.to_f)
        conn.expire(key, 60)
        conn.rpop("#{key}-signals")
      end
    end

    return unless msg

    if JVM_RESERVED_SIGNALS.include?(msg)
      Roundhouse::CLI.instance.handle_signal(msg)
    else
      ::Process.kill(msg, $$)
    end
  rescue => e
    # ignore all redis/network issues
    logger.error("heartbeat: #{e.message}")
  end
end