class Myreplicator::Parallelizer
Attributes
queue[RW]
Public Class Methods
new(*args)
click to toggle source
# File lib/transporter/parallelizer.rb, line 16 def initialize *args options = args.extract_options! @queue = Queue.new @threads = [] @max_threads = options[:max_threads].nil? ? 10 : options[:max_threads] @klass = options[:klass].constantize end
Public Instance Methods
done?()
click to toggle source
Returns true when all jobs are processed and no thread is running
# File lib/transporter/parallelizer.rb, line 91 def done? if @queue.size == 0 && @threads.size == 0 return true end return false end
manage_threads()
click to toggle source
Clears dead threads, frees thread pool for more jobs Exits when no more threads are left
# File lib/transporter/parallelizer.rb, line 63 def manage_threads Thread.new do while(@threads.size > 0) done = [] @threads.each do |t| done << t if t[:thread_state] == "done" || !t.status # puts t.object_id.to_s + "--" + t.status.to_s + "--" + t.to_s # raise "Nil Thread State" if t[:thread_state].nil? end done.each{|d| @threads.delete(d)} # Clear dead threads # If no more jobs are left, mark done if done? @done = true else puts "Sleeping for 2" sleep 2 # Wait for more threads to spawn end end end end
run()
click to toggle source
Runs while there are jobs in the queue Waits for a second and checks for available threads Exits when all jobs are allocated in threads
# File lib/transporter/parallelizer.rb, line 29 def run @done = false @manager_running = false reaper = nil while @queue.size > 0 if @threads.size <= @max_threads @threads << Thread.new(@queue.pop) do |proc| Thread.current[:thread_state] = "running" @klass.new.instance_exec(proc[:params], &proc[:block]) Thread.current[:thread_state] = "done" end else unless @manager_running reaper = manage_threads @manager_running = true end sleep 1 end end # Run manager if thread size never reached max reaper = manage_threads unless @manager_running # Waits until all threads are completed # Before exiting reaper.join end