module Parallel

Constants

Stop
VERSION

Public Class Methods

all?(*args, &block) click to toggle source
# File lib/parallel.rb, line 225
def all?(*args, &block)
  raise "You must provide a block when calling #all?" if block.nil?
  !!each(*args) { |*a| raise Parallel::Kill unless block.call(*a) }
end
any?(*args, &block) click to toggle source
# File lib/parallel.rb, line 220
def any?(*args, &block)
  raise "You must provide a block when calling #any?" if block.nil?
  !each(*args) { |*a| raise Parallel::Kill if block.call(*a) }
end
each(array, options={}, &block) click to toggle source
# File lib/parallel.rb, line 216
def each(array, options={}, &block)
  map(array, options.merge(:preserve_results => false), &block)
end
each_with_index(array, options={}, &block) click to toggle source
# File lib/parallel.rb, line 230
def each_with_index(array, options={}, &block)
  each(array, options.merge(:with_index => true), &block)
end
in_processes(options = {}, &block) click to toggle source
# File lib/parallel.rb, line 210
def in_processes(options = {}, &block)
  count, options = extract_count_from_options(options)
  count ||= processor_count
  map(0...count, options.merge(:in_processes => count), &block)
end
in_threads(options={:count => 2}) { |i| ... } click to toggle source
# File lib/parallel.rb, line 203
def in_threads(options={:count => 2})
  count, _ = extract_count_from_options(options)
  Array.new(count) do |i|
    Thread.new { yield(i) }
  end.map!(&:value)
end
map(source, options = {}, &block) click to toggle source
# File lib/parallel.rb, line 234
def map(source, options = {}, &block)
  options[:mutex] = Mutex.new

  if RUBY_PLATFORM =~ /java/ and not options[:in_processes]
    method = :in_threads
    size = options[method] || processor_count
  elsif options[:in_threads]
    method = :in_threads
    size = options[method]
  else
    method = :in_processes
    if Process.respond_to?(:fork)
      size = options[method] || processor_count
    else
      warn "Process.fork is not supported by this Ruby"
      size = 0
    end
  end

  job_factory = JobFactory.new(source, options[:mutex])
  size = [job_factory.size, size].min

  options[:return_results] = (options[:preserve_results] != false || !!options[:finish])
  add_progress_bar!(job_factory, options)

  results = if size == 0
    work_direct(job_factory, options, &block)
  elsif method == :in_threads
    work_in_threads(job_factory, options.merge(:count => size), &block)
  else
    work_in_processes(job_factory, options.merge(:count => size), &block)
  end
  if results
    options[:return_results] ? results : source
  end
end
map_with_index(array, options={}, &block) click to toggle source
# File lib/parallel.rb, line 271
def map_with_index(array, options={}, &block)
  map(array, options.merge(:with_index => true), &block)
end
worker_number() click to toggle source
# File lib/parallel.rb, line 275
def worker_number
  Thread.current[:parallel_worker_number]
end
worker_number=(worker_num) click to toggle source
# File lib/parallel.rb, line 279
def worker_number=(worker_num)
  Thread.current[:parallel_worker_number] = worker_num
end

Private Class Methods

add_progress_bar!(job_factory, options) click to toggle source
# File lib/parallel.rb, line 285
def add_progress_bar!(job_factory, options)
  if progress_options = options[:progress]
    raise "Progressbar can only be used with array like items" if job_factory.size == Float::INFINITY
    require 'ruby-progressbar'

    if progress_options == true
      progress_options = { title: "Progress" }
    elsif progress_options.respond_to? :to_str
      progress_options = { title: progress_options.to_str }
    end

    progress_options = {
      total: job_factory.size,
      format: '%t |%E | %B | %a'
    }.merge(progress_options)

    progress = ProgressBar.create(progress_options)
    old_finish = options[:finish]
    options[:finish] = lambda do |item, i, result|
      old_finish.call(item, i, result) if old_finish
      progress.increment
    end
  end
end
call_with_index(item, index, options, &block) click to toggle source
# File lib/parallel.rb, line 480
def call_with_index(item, index, options, &block)
  args = [item]
  args << index if options[:with_index]
  if options[:return_results]
    block.call(*args)
  else
    block.call(*args)
    nil # avoid GC overhead of passing large results around
  end
end
create_workers(job_factory, options, &block) click to toggle source
# File lib/parallel.rb, line 416
def create_workers(job_factory, options, &block)
  workers = []
  Array.new(options[:count]).each_with_index do |_, i|
    workers << worker(job_factory, options.merge(started_workers: workers, worker_number: i), &block)
  end
  workers
end
extract_count_from_options(options) click to toggle source

options is either a Integer or a Hash with :count

# File lib/parallel.rb, line 470
def extract_count_from_options(options)
  if options.is_a?(Hash)
    count = options[:count]
  else
    count = options
    options = {}
  end
  [count, options]
end
handle_exception(exception, results) click to toggle source
# File lib/parallel.rb, line 463
def handle_exception(exception, results)
  return nil if [Parallel::Break, Parallel::Kill].include? exception.class
  raise exception if exception
  results
end
process_incoming_jobs(read, write, job_factory, options, &block) click to toggle source
# File lib/parallel.rb, line 450
def process_incoming_jobs(read, write, job_factory, options, &block)
  until read.eof?
    data = Marshal.load(read)
    item, index = job_factory.unpack(data)
    result = begin
      call_with_index(item, index, options, &block)
    rescue
      ExceptionWrapper.new($!)
    end
    Marshal.dump(result, write)
  end
end
replace_worker(job_factory, workers, i, options, blk) click to toggle source
# File lib/parallel.rb, line 404
def replace_worker(job_factory, workers, i, options, blk)
  options[:mutex].synchronize do
    # old worker is no longer used ... stop it
    worker = workers[i]
    worker.stop if worker

    # create a new replacement worker
    running = workers - [worker]
    workers[i] = worker(job_factory, options.merge(started_workers: running, worker_number: i), &blk)
  end
end
with_instrumentation(item, index, options) { || ... } click to toggle source
# File lib/parallel.rb, line 491
def with_instrumentation(item, index, options)
  on_start = options[:start]
  on_finish = options[:finish]
  options[:mutex].synchronize { on_start.call(item, index) } if on_start
  result = yield
  options[:mutex].synchronize { on_finish.call(item, index, result) } if on_finish
  result unless options[:preserve_results] == false
end
work_direct(job_factory, options, &block) click to toggle source
# File lib/parallel.rb, line 310
def work_direct(job_factory, options, &block)
  self.worker_number = 0
  results = []
  exception = nil
  begin
    while set = job_factory.next
      item, index = set
      results << with_instrumentation(item, index, options) do
        call_with_index(item, index, options, &block)
      end
    end
  rescue
    exception = $!
  end
  handle_exception(exception, results)
ensure
  self.worker_number = nil
end
work_in_processes(job_factory, options, &blk) click to toggle source
# File lib/parallel.rb, line 354
def work_in_processes(job_factory, options, &blk)
  workers = if options[:isolation]
    [] # we create workers per job and not beforehand
  else
    create_workers(job_factory, options, &blk)
  end
  results = []
  results_mutex = Mutex.new # arrays are not thread-safe
  exception = nil

  UserInterruptHandler.kill_on_ctrl_c(workers.map(&:pid), options) do
    in_threads(options) do |i|
      worker = workers[i]

      begin
        loop do
          break if exception
          item, index = job_factory.next
          break unless index

          if options[:isolation]
            worker = replace_worker(job_factory, workers, i, options, blk)
          end

          worker.thread = Thread.current

          begin
            result = with_instrumentation item, index, options do
              worker.work(job_factory.pack(item, index))
            end
            results_mutex.synchronize { results[index] = result } # arrays are not threads safe on jRuby
          rescue
            exception = $!
            if Parallel::Kill === exception
              (workers - [worker]).each do |w|
                w.thread.kill unless w.thread.nil?
                UserInterruptHandler.kill(w.pid)
              end
            end
          end
        end
      ensure
        worker.stop if worker
      end
    end
  end

  handle_exception(exception, results)
end
work_in_threads(job_factory, options, &block) click to toggle source
# File lib/parallel.rb, line 329
def work_in_threads(job_factory, options, &block)
  raise "interrupt_signal is no longer supported for threads" if options[:interrupt_signal]
  results = []
  results_mutex = Mutex.new # arrays are not thread-safe on jRuby
  exception = nil

  in_threads(options) do |worker_num|
    self.worker_number = worker_num
    # as long as there are more jobs, work on one of them
    while !exception && set = job_factory.next
      begin
        item, index = set
        result = with_instrumentation item, index, options do
          call_with_index(item, index, options, &block)
        end
        results_mutex.synchronize { results[index] = result }
      rescue
        exception = $!
      end
    end
  end

  handle_exception(exception, results)
end
worker(job_factory, options, &block) click to toggle source
# File lib/parallel.rb, line 424
def worker(job_factory, options, &block)
  child_read, parent_write = IO.pipe
  parent_read, child_write = IO.pipe

  pid = Process.fork do
    self.worker_number = options[:worker_number]

    begin
      options.delete(:started_workers).each(&:close_pipes)

      parent_write.close
      parent_read.close

      process_incoming_jobs(child_read, child_write, job_factory, options, &block)
    ensure
      child_read.close
      child_write.close
    end
  end

  child_read.close
  child_write.close

  Worker.new(parent_read, parent_write, pid)
end