class ZTK::Parallel

Parallel Processing Class

This class can be used to easily run iterative and linear processes in a parallel manner.

The before fork callback is called once in the parent process.

The after fork callback is called twice, once in the parent process and once in the child process.

@example Parallel processing with callbacks

a_callback = Proc.new do |pid|
  puts "Hello from After Callback - PID #{pid}"
end

b_callback = Proc.new do |pid|
  puts "Hello from Before Callback - PID #{pid}"
end

parallel = ZTK::Parallel.new
parallel.config do |config|
  config.before_fork = b_callback
  config.after_fork = a_callback
end

puts Process.pid.inspect

3.times do |x|
  parallel.process do
    x
  end
end

parallel.waitall
puts parallel.results.inspect

@author Zachary Patten <zpatten AT jovelabs DOT io>

Constants

DEFAULT_CHILD_TIMEOUT

Child process timeout in seconds; <= 0 to disable

MAX_FORKS

Default Maximum Number of Forks

MAX_MEMORY

Platforms memory capacity in bytes

TRAPPED_SIGNALS

Attributes

results[RW]

Result Set

Public Class Methods

new(configuration={}) click to toggle source

@param [Hash] configuration Configuration options hash. @option config [Integer] :max_forks Maximum number of forks to use. @option config [Proc] :before_fork (nil) Proc to call before forking. @option config [Proc] :after_fork (nil) Proc to call after forking.

Calls superclass method ZTK::Base::new
# File lib/ztk/parallel.rb, line 97
def initialize(configuration={})
  super({
    :max_forks        => MAX_FORKS,
    :raise_exceptions => true,
    :child_timeout    => DEFAULT_CHILD_TIMEOUT
  }, configuration)

  (config.max_forks < 1) and log_and_raise(ParallelError, "max_forks must be equal to or greater than one!")

  @forks = Array.new
  @results = Array.new
  GC.respond_to?(:copy_on_write_friendly=) and GC.copy_on_write_friendly = true

  TRAPPED_SIGNALS.each do |signal|
    Signal.trap(signal) do
      $stderr.puts("SIG#{signal} received by PID##{Process.pid}; signaling child processes...")

      signal_all(signal)

      (signal == "INT") or exit!(1)
    end
  end

  Kernel.at_exit do
    signal_all('TERM')
  end
end

Public Instance Methods

count() click to toggle source

Count of active forks.

@return [Integer] Current number of active forks.

# File lib/ztk/parallel.rb, line 266
def count
  config.ui.logger.debug { "count(#{@forks.count})" }
  @forks.count
end
pids() click to toggle source

Child PIDs

@return [Array<Integer>] An array of child PIDs, if any.

# File lib/ztk/parallel.rb, line 274
def pids
  @forks.collect{ |fork| fork[:pid] }
end
process(&block) click to toggle source

Process in parallel.

@yield Block should execute tasks to be performed in parallel. @yieldreturn [Object] Block can return any object to be marshalled back to

the parent processes result set.

@return [Integer] Returns the pid of the child process forked.

# File lib/ztk/parallel.rb, line 131
def process(&block)
  !block_given? and log_and_raise(ParallelError, "You must supply a block to the process method!")

  config.ui.logger.debug { "forks(#{@forks.inspect})" }

  while (@forks.count >= config.max_forks) do
    wait
  end

  child_reader, parent_writer = IO.pipe
  parent_reader, child_writer = IO.pipe

  config.before_fork and config.before_fork.call(Process.pid)
  pid = Process.fork do
    begin
      TRAPPED_SIGNALS.each { |signal| Signal.trap(signal) { } }

      parent_writer.close
      parent_reader.close

      config.after_fork and config.after_fork.call(Process.pid)

      data = nil
      begin
        ::Timeout.timeout(config.child_timeout, ZTK::Parallel::Timeout) do
          data = block.call
        end
      rescue Exception => e
        config.ui.logger.fatal { e.message }
        e.backtrace.each do |line|
          config.ui.logger << "#{line}\n"
        end
        data = ExceptionWrapper.new(e)
      end

      if !data.nil?
        begin
          encoded_data = Base64.encode64(Zlib::Deflate.deflate(Marshal.dump(data)))
          config.ui.logger.debug { "write(#{encoded_data.length}B: #{data.inspect})" }
          child_writer.write(encoded_data)
        rescue Exception => e
          config.ui.logger.warn { "Exception while writing data to child_writer! - #{e.inspect}" }
        end
      end

    rescue Exception => e
      config.ui.logger.fatal { "Exception in Child Process Handler: #{e.inspect}" }

    ensure
      child_reader.close rescue nil
      child_writer.close rescue nil

      Process.exit!(0)
    end
  end
  config.after_fork and config.after_fork.call(Process.pid)

  child_reader.close
  child_writer.close

  fork = {:reader => parent_reader, :writer => parent_writer, :pid => pid}
  @forks << fork

  pid
end
signal_all(signal="KILL") click to toggle source

Signals all forks.

@return [Integer] The number of processes signaled.

# File lib/ztk/parallel.rb, line 248
def signal_all(signal="KILL")
  signaled = 0
  if (!@forks.nil? && (@forks.count > 0))
    @forks.each do |fork|
      begin
        Process.kill(signal, fork[:pid])
        signaled += 1
      rescue
        nil
      end
    end
  end
  signaled
end
wait(flags=0) click to toggle source

Wait for a single fork to finish.

If a fork successfully finishes, it's return value from the process block is stored into the main result set.

@return [Array<pid, status, data>] An array containing the pid,

status and data returned from the process block.  If wait2() fails nil
is returned.
# File lib/ztk/parallel.rb, line 205
def wait(flags=0)
  config.ui.logger.debug { "wait" }
  config.ui.logger.debug { "forks(#{@forks.inspect})" }

  return nil if @forks.count <= 0

  pid, status = (Process.wait2(-1, Process::WUNTRACED) rescue nil)

  if !pid.nil? && !status.nil? && !(fork = @forks.select{ |f| f[:pid] == pid }.first).nil?
    data = nil
    begin
      data = Marshal.load(Zlib::Inflate.inflate(Base64.decode64(fork[:reader].read).to_s))
    rescue Zlib::BufError
      config.ui.logger.fatal { "Encountered Zlib::BufError when reading child pipe." }
    end
    config.ui.logger.debug { "read(#{data.inspect})" }

    data = process_data(data)
    !data.nil? and @results.push(data)

    fork[:reader].close
    fork[:writer].close

    @forks -= [fork]
    return [pid, status, data]
  end
  nil
end
waitall() click to toggle source

Waits for all forks to finish.

@return [Array<Object>] The results from all of the process blocks.

# File lib/ztk/parallel.rb, line 237
def waitall
  config.ui.logger.debug { "waitall" }
  while @forks.count > 0
    self.wait
  end
  @results
end

Private Instance Methods

process_data(data) click to toggle source
# File lib/ztk/parallel.rb, line 281
def process_data(data)
  return data if !(ZTK::Parallel::ExceptionWrapper === data)

  if ((config.raise_exceptions == true) || (ZTK::Parallel::Break === data.exception) || (ZTK::Parallel::Timeout === data.exception))
    config.ui.logger.fatal { "exception(#{data.exception.inspect})" }
    signal_all
    raise data.exception
  end

  config.ui.logger.warn { "exception(#{data.exception.inspect})" }
  return data.exception
end