class Jober::ThreadedManager

Public Class Methods

new(klasses = nil, opts = {}) click to toggle source
# File lib/jober/threaded_manager.rb, line 13
def initialize(klasses = nil, opts = {})
  @klasses = Array(klasses || Jober.auto_classes)
  @stopped = false
  h = Hash.new(0)
  @objects = @klasses.map do |klass|
    if klass.is_a?(String)
      klass_str = klass
      klass = Jober.find_class(klass_str)
      raise "unknown class #{klass_str}" unless klass
    end
    obj = klass.new(opts.merge(:unique_id => h[klass]))
    h[klass] += 1
    obj
  end
end

Public Instance Methods

default_sleep() click to toggle source
# File lib/jober/threaded_manager.rb, line 9
def default_sleep
  @default_sleep ||= 10
end
default_sleep=(ds) click to toggle source
# File lib/jober/threaded_manager.rb, line 5
def default_sleep=(ds)
  @default_sleep = ds
end
run_loop() click to toggle source
# File lib/jober/threaded_manager.rb, line 35
def run_loop
  info { "run loop for #{@klasses.inspect}, in threads: #{@objects.length}" }
  @threads = @objects.map { |obj| make_thread(obj) }

  # set signals
  trap("INT") { @stopped = true }
  trap("QUIT") { @stopped = true }

  # sleeping infinitely
  sleeping

  info { "prepare quit ..." }

  # send stop to all objects
  @objects.each(&:stop!)

  # sleep a little
  sleep(0.2)

  # sleep a little to give time for threads to quit
  wait_for_kill(default_sleep.to_f)

  names = not_finished_objects_names
  if names.empty?
    info { "quit!" }
  else
    info { "quit! and force killing #{names.inspect}" }
  end

  # kill all threads, if they still alive
  @threads.select(&:alive?).each(&:kill)
end
set_objects(objects) click to toggle source
# File lib/jober/threaded_manager.rb, line 29
def set_objects(objects)
  c = 0
  objects.each { |o| o.instance_variable_set(:@unique_id, c); c += 1 }
  @objects = objects
end
stop!() click to toggle source
# File lib/jober/threaded_manager.rb, line 68
def stop!
  @stopped = true
end

Private Instance Methods

make_thread(obj) click to toggle source
# File lib/jober/threaded_manager.rb, line 96
def make_thread(obj)
  Thread.new do
    loop do
      break if @stopped
      obj.catch { obj.run_loop }
      break if @stopped
      sleep 1.0
    end
  end
end
not_finished_objects_names() click to toggle source
# File lib/jober/threaded_manager.rb, line 81
def not_finished_objects_names
  @objects.select { |o| o.finished == false }.map { |o| o.class.name }
end
sleeping() click to toggle source
# File lib/jober/threaded_manager.rb, line 74
def sleeping
  loop do
    sleep 0.5
    return if @stopped
  end
end
wait_for_kill(interval) click to toggle source
# File lib/jober/threaded_manager.rb, line 85
def wait_for_kill(interval)
  info { "waiting quiting jobs (#{not_finished_objects_names.inspect}) for %.1fm ..." % [interval / 60.0] }
  Timeout.timeout(interval.to_f) do
    loop do
      sleep 0.3
      return if @threads.none? &:alive?
    end
  end
rescue Timeout::Error
end