class JobRunner

Runs many jobs in parallel, and returns their interleaved results. (NOTE: The JobRunner can be run multiple times; each time the blocks

will be executed again.)

Examples:

JobRunner.new do |jr|

jr.add { 3 }
jr.add { sleep 0.1; 2 }
jr.add { sleep 0.2; 1 }

jr.each_result do |result|
  p result
end

end

jr = JobRunner.new(

proc { 1 },
proc { 2 },
proc { 3 }

)

2.times do

jr.each_result { |result| p result }

end

Public Class Methods

new(*blocks, debug: false) { |self| ... } click to toggle source
# File lib/epitools/job_runner.rb, line 29
def initialize(*blocks, debug: false)
  @threads = []
  @results = Thread::Queue.new
  @jobs    = []
  @started = false
  @debug   = debug

  if blocks.any?
    blocks.each { |block| add &block }
  else
    yield self if block_given?
  end
end

Public Instance Methods

add(&block) click to toggle source
# File lib/epitools/job_runner.rb, line 47
def add(&block)
  dmsg("added job #{block}")
  @jobs << block
end
dmsg(msg) click to toggle source
# File lib/epitools/job_runner.rb, line 43
def dmsg(msg)
  puts "[#{Time.now}] #{msg}" if @debug
end
each_result() { |pop| ... } click to toggle source
# File lib/epitools/job_runner.rb, line 78
def each_result
  go! unless @started

  loop do
    yield @results.pop
    reap!
    break if @threads.empty? and @results.empty?
  end

  @started = false
end
go!() click to toggle source
# File lib/epitools/job_runner.rb, line 61
def go!
  if @started
    raise "Error: already started"
  else
    dmsg("starting #{@threads.size} jobs")
  end

  @started = true
  @jobs.each do |job|
    dmsg("adding #{job}")
    @threads << Thread.new do
      @results << job.call
      dmsg("job #{job} complete")
    end
  end
end
reap!() click to toggle source
# File lib/epitools/job_runner.rb, line 52
def reap!
  if @threads.any?
    dmsg("reaping #{@threads.size} threads")
    @threads.delete_if { |t| not t.alive? }
  else
    dmsg("reap failed: no threads")
  end
end