class ZTK::Parallel
Parallel
Processing Class
This class can be used to easily run iterative and linear processes in a parallel manner.
The before fork callback is called once in the parent process.
The after fork callback is called twice, once in the parent process and once in the child process.
@example Parallel
processing with callbacks
a_callback = Proc.new do |pid| puts "Hello from After Callback - PID #{pid}" end b_callback = Proc.new do |pid| puts "Hello from Before Callback - PID #{pid}" end parallel = ZTK::Parallel.new parallel.config do |config| config.before_fork = b_callback config.after_fork = a_callback end puts Process.pid.inspect 3.times do |x| parallel.process do x end end parallel.waitall puts parallel.results.inspect
@author Zachary Patten <zpatten AT jovelabs DOT io>
Constants
- DEFAULT_CHILD_TIMEOUT
Child process timeout in seconds; <= 0 to disable
- MAX_FORKS
Default Maximum Number of Forks
- MAX_MEMORY
Platforms memory capacity in bytes
- TRAPPED_SIGNALS
Attributes
Result Set
Public Class Methods
@param [Hash] configuration Configuration options hash. @option config [Integer] :max_forks Maximum number of forks to use. @option config [Proc] :before_fork (nil) Proc to call before forking. @option config [Proc] :after_fork (nil) Proc to call after forking.
ZTK::Base::new
# File lib/ztk/parallel.rb, line 97 def initialize(configuration={}) super({ :max_forks => MAX_FORKS, :raise_exceptions => true, :child_timeout => DEFAULT_CHILD_TIMEOUT }, configuration) (config.max_forks < 1) and log_and_raise(ParallelError, "max_forks must be equal to or greater than one!") @forks = Array.new @results = Array.new GC.respond_to?(:copy_on_write_friendly=) and GC.copy_on_write_friendly = true TRAPPED_SIGNALS.each do |signal| Signal.trap(signal) do $stderr.puts("SIG#{signal} received by PID##{Process.pid}; signaling child processes...") signal_all(signal) (signal == "INT") or exit!(1) end end Kernel.at_exit do signal_all('TERM') end end
Public Instance Methods
Count of active forks.
@return [Integer] Current number of active forks.
# File lib/ztk/parallel.rb, line 266 def count config.ui.logger.debug { "count(#{@forks.count})" } @forks.count end
Child PIDs
@return [Array<Integer>] An array of child PIDs, if any.
# File lib/ztk/parallel.rb, line 274 def pids @forks.collect{ |fork| fork[:pid] } end
Process in parallel.
@yield Block should execute tasks to be performed in parallel. @yieldreturn [Object] Block can return any object to be marshalled back to
the parent processes result set.
@return [Integer] Returns the pid of the child process forked.
# File lib/ztk/parallel.rb, line 131 def process(&block) !block_given? and log_and_raise(ParallelError, "You must supply a block to the process method!") config.ui.logger.debug { "forks(#{@forks.inspect})" } while (@forks.count >= config.max_forks) do wait end child_reader, parent_writer = IO.pipe parent_reader, child_writer = IO.pipe config.before_fork and config.before_fork.call(Process.pid) pid = Process.fork do begin TRAPPED_SIGNALS.each { |signal| Signal.trap(signal) { } } parent_writer.close parent_reader.close config.after_fork and config.after_fork.call(Process.pid) data = nil begin ::Timeout.timeout(config.child_timeout, ZTK::Parallel::Timeout) do data = block.call end rescue Exception => e config.ui.logger.fatal { e.message } e.backtrace.each do |line| config.ui.logger << "#{line}\n" end data = ExceptionWrapper.new(e) end if !data.nil? begin encoded_data = Base64.encode64(Zlib::Deflate.deflate(Marshal.dump(data))) config.ui.logger.debug { "write(#{encoded_data.length}B: #{data.inspect})" } child_writer.write(encoded_data) rescue Exception => e config.ui.logger.warn { "Exception while writing data to child_writer! - #{e.inspect}" } end end rescue Exception => e config.ui.logger.fatal { "Exception in Child Process Handler: #{e.inspect}" } ensure child_reader.close rescue nil child_writer.close rescue nil Process.exit!(0) end end config.after_fork and config.after_fork.call(Process.pid) child_reader.close child_writer.close fork = {:reader => parent_reader, :writer => parent_writer, :pid => pid} @forks << fork pid end
Signals all forks.
@return [Integer] The number of processes signaled.
# File lib/ztk/parallel.rb, line 248 def signal_all(signal="KILL") signaled = 0 if (!@forks.nil? && (@forks.count > 0)) @forks.each do |fork| begin Process.kill(signal, fork[:pid]) signaled += 1 rescue nil end end end signaled end
Wait for a single fork to finish.
If a fork successfully finishes, it's return value from the process block is stored into the main result set.
@return [Array<pid, status, data>] An array containing the pid,
status and data returned from the process block. If wait2() fails nil is returned.
# File lib/ztk/parallel.rb, line 205 def wait(flags=0) config.ui.logger.debug { "wait" } config.ui.logger.debug { "forks(#{@forks.inspect})" } return nil if @forks.count <= 0 pid, status = (Process.wait2(-1, Process::WUNTRACED) rescue nil) if !pid.nil? && !status.nil? && !(fork = @forks.select{ |f| f[:pid] == pid }.first).nil? data = nil begin data = Marshal.load(Zlib::Inflate.inflate(Base64.decode64(fork[:reader].read).to_s)) rescue Zlib::BufError config.ui.logger.fatal { "Encountered Zlib::BufError when reading child pipe." } end config.ui.logger.debug { "read(#{data.inspect})" } data = process_data(data) !data.nil? and @results.push(data) fork[:reader].close fork[:writer].close @forks -= [fork] return [pid, status, data] end nil end
Waits for all forks to finish.
@return [Array<Object>] The results from all of the process blocks.
# File lib/ztk/parallel.rb, line 237 def waitall config.ui.logger.debug { "waitall" } while @forks.count > 0 self.wait end @results end
Private Instance Methods
# File lib/ztk/parallel.rb, line 281 def process_data(data) return data if !(ZTK::Parallel::ExceptionWrapper === data) if ((config.raise_exceptions == true) || (ZTK::Parallel::Break === data.exception) || (ZTK::Parallel::Timeout === data.exception)) config.ui.logger.fatal { "exception(#{data.exception.inspect})" } signal_all raise data.exception end config.ui.logger.warn { "exception(#{data.exception.inspect})" } return data.exception end