class Myreplicator::Parallelizer

Attributes

queue[RW]

Public Class Methods

new(*args) click to toggle source
# File lib/transporter/parallelizer.rb, line 16
def initialize *args
  options = args.extract_options!
  @queue = Queue.new
  @threads = []
  @max_threads = options[:max_threads].nil? ? 10 : options[:max_threads]     
  @klass = options[:klass].constantize
end

Public Instance Methods

done?() click to toggle source

Returns true when all jobs are processed and no thread is running

# File lib/transporter/parallelizer.rb, line 91
def done?
  if @queue.size == 0 && @threads.size == 0
    return true
  end
  return false
end
manage_threads() click to toggle source

Clears dead threads, frees thread pool for more jobs Exits when no more threads are left

# File lib/transporter/parallelizer.rb, line 63
def manage_threads
  Thread.new do 
    while(@threads.size > 0)
      done = []
      @threads.each do |t|
        done << t if t[:thread_state] == "done" || !t.status
        # puts t.object_id.to_s + "--" + t.status.to_s + "--" + t.to_s
        # raise "Nil Thread State" if t[:thread_state].nil?
      end
      done.each{|d| @threads.delete(d)} # Clear dead threads
      
      # If no more jobs are left, mark done

      if done?
        @done = true
      else
        puts "Sleeping for 2"
        sleep 2 # Wait for more threads to spawn
      end

    end
  end
end
run() click to toggle source

Runs while there are jobs in the queue Waits for a second and checks for available threads Exits when all jobs are allocated in threads

# File lib/transporter/parallelizer.rb, line 29
def run
  @done = false
  @manager_running = false
  reaper = nil

  while @queue.size > 0
    if @threads.size <= @max_threads
      @threads << Thread.new(@queue.pop) do |proc|
        Thread.current[:thread_state] = "running"
        @klass.new.instance_exec(proc[:params], &proc[:block])
        Thread.current[:thread_state] = "done"
      end       
    else
      unless @manager_running
        reaper = manage_threads 
        @manager_running = true
      end
      sleep 1
    end
  end   
  
  # Run manager if thread size never reached max
  reaper = manage_threads unless @manager_running

  # Waits until all threads are completed
  # Before exiting
  reaper.join
end