module Enumerable

Public Instance Methods

threadify(*args, &block) click to toggle source
# File lib/threadify.rb, line 40
def threadify(*args, &block)
# setup
#
  opts = args.last.is_a?(Hash) ? args.pop : {}
  opts.keys.each{|key| opts[key.to_s.to_sym] = opts.delete(key)}
  opts[:threads] ||= (Numeric === args.first ? args.shift : Threadify.threads)
  opts[:strategy] ||= (args.empty? ? Threadify.strategy : args)

  threads = Integer(opts[:threads])
  strategy = opts[:strategy]
  done = Object.new.freeze
  nothing = done
  jobs = Array.new(threads).map{ [] }
  top = Thread.current

# produce jobs
#
  i = 0
  send(*strategy){|*args| jobs[i % threads].push([args, i]); i += 1}
  threads.times{|i| jobs[i].push(done)}

# setup consumer list
#
  consumers = Array.new threads 

# setup support for short-circuit bailout via 'throw :threadify'
#
  thrownv = Hash.new
  thrownq = Queue.new

  caught = false

  catcher = Thread.new do
    loop do
      thrown = thrownq.pop
      break if thrown == done
      i, thrown = thrown
      thrownv[i] = thrown
      caught = true
    end
  end

# fire off the consumers
#
  threads.times do |i|
    consumers[i] = Thread.new(jobs[i]) do |jobsi|
      this = Thread.current
      this.abort_on_exception = Threadify.abort_on_exception
  
      job = nil

      thrown =
        catch(:threadify) do
          loop{
            break if caught
            job = jobsi.shift
            break if job == done
            argv = job.first
            jobsi << (job << block.call(*argv))
          }
          nothing
        end


      unless nothing == thrown
        thrownq.push [i, thrown]
        argv, i = job
      end
    end
  end

# wait for consumers to finish
#
  consumers.map{|t| t.join}

# nuke the catcher
#
  thrownq.push done
  catcher.join

# iff something(s) was thrown return the one which would have been thrown
# earliest in non-parallel execution
#
  unless thrownv.empty?
    key = thrownv.keys.sort.first
    return thrownv[key]
  end

# collect the results and return them
#
  ret = []
  jobs.each do |results|
    results.each do |result|
      break if result == done
      elem, i, value = result
      ret[i] = value
    end
  end
  ret
end