class NixAdm::Pipeline
This class represents a UNIX pipeline. It provides two different implementations: a classic version which works exacly like the shell (Pipeline#classic()
) and a conservative version that ensures all commands in the pipeline must succeed in order to continue (Pipeline#strict()
). Ultimately both implementations work using the Process.spawn() method. They differ in how data flows between processes and how the pipeline behaves in the event of error.
Examples:
sys = NixAdm::Pipeline.new() sys.run('ssh jan zfs list -t snapshot') puts sys.out puts sys.stats # Strict mode: sys.mode = :strict sys.run('ssh jan zfs list -t snapshot') puts sys.out # Tempfile file = Tempfile.new('test') file.write('tempfile') sys.run([ './one.rb', './two.rb' ], file) puts pipeline.out # OS File file = File.open('/tmp/data.txt') pipeline.run([ './one.rb', './two.rb' ], file) puts pipeline.out
Attributes
Public Class Methods
# File src/lib/nixadm/pipeline.rb, line 126 def initialize() @opts = {} @stats = [] @mode = :classic @debug = false @error = nil @throw_on_fail = true end
Public Instance Methods
Runs one or more commands sequentially in a class UNIX pipeline. This is a true UNIX pipeline. It starts each command in parallel and data flows through each using UNIX pipes. If a command fails, the pipeline will continue until the last command exists.
@param commands A string containing a single command or an array of multiple commands.
@param input Standard in to the first process in pipeline. This can be the a string or IO object.
@return Returns true if all commands in pipeline completed successfully, false otherwise. Standard out of the last command it stored in the @out member while standard error is stored in @error. Process status info for each command is stored in @stats.
# File src/lib/nixadm/pipeline.rb, line 166 def classic(args, input=nil) if args.is_a? String args = [ args ] end if @debug $stderr.puts args end stderr, ewrite = IO.pipe args << { :err=>ewrite } Open3.pipeline_rw(*args) do |stdin, stdout, threads| if block_given? yield stdin, stdout else if not input.nil? if input.is_a?(Tempfile) or input.is_a?(IO) input.seek(0) stdin.write(input.read()) end if input.is_a?(String) stdin.write(input) end end stdin.close() # Collect stdout @out = '' while true line = stdout.gets break if line.nil? @out += line end end threads.each do |t| @stats << t.value end end ewrite.close() # Collect stderr @error = '' while true line = stderr.gets break if line.nil? @error += line end result = @stats.all? { |s| s.exitstatus == 0 } if result == false and @throw_on_fail # Assemble command-line representation of pipeline cmd_line = [] args.each do |e| if e.class == String cmd_line << e elsif e.class == Array cmd_line << e.join(' ') end end raise "Command failed: #{cmd_line.join(' | ')}\n #{error()}" end return result end
# File src/lib/nixadm/pipeline.rb, line 375 def closeTempfiles(files) files.each do |file| if not file.nil? and file.is_a?(Tempfile) file.close() file.unlink() end end end
Run a UNIX pipeline. This selects between the two pipeline implementations: sequential or parallel depending on the @mode value. If @mode is :classic, it runs the classic() implementation, otherwise it runs the strict() implementation. Ultimately both implementations work using the Process.spawn() method. They differ in how data flows between processes and how the pipeline behaves in the event of error.
# File src/lib/nixadm/pipeline.rb, line 142 def run(commands, input=nil, &block) if @mode == :classic return classic(commands, input, &block) else return strict(commands, input, &block) end end
Runs one or more commands sequentially in a pipeline. This is not a true UNIX pipeline in that it's more conservative. It runs each command fully to exit and ensures that it completed successfully (with 0 exit status). If a command fails, it stops the pipeline and return false. Use this form when you need to ensure that every command in the pipeline must succeed before calling the next.
Rather than using UNIX pipes to connect commands, this uses temporary files. The standard out of each process is piped to a temporary file which in turn is used as the standard in for the next command.
@param commands A string containing a single command or an array of multiple commands.
@param input Standard in to the first process in pipeline. This can be the a string or IO object.
@return Returns true if all commands in pipeline completed successfully, false otherwise. Standard out of the last command it stored in the @out member while standard error is stored in @error. Process status info for each command is stored in @stats.
# File src/lib/nixadm/pipeline.rb, line 262 def strict(commands, input=nil) opts_base = @opts.dup in_read, in_write = IO.pipe() r = nil w = nil e = nil if commands.is_a? String commands = [ commands ] end commands.each_with_index do |cmd, i| cmd_opts = opts_base.dup if String === cmd cmd = [cmd] else cmd_opts.update cmd.pop if Hash === cmd.last end # Read stream if i == 0 r = in_read else if r != in_read # Close previous tempfile r.close() r.unlink() if r.is_a?(Tempfile) end r = w if r.is_a?(Tempfile) r.seek(0) end end # Write stream w = Tempfile.new(tempfile_name()) # Close previous stderr tempfile if not e.nil? e.close() e.unlink() end e = Tempfile.new(tempfile_name()) cmd_opts[:in] = r cmd_opts[:out] = w cmd_opts[:err] = e if @debug $stderr.puts cmd end pid = Process.spawn(*cmd, cmd_opts) if i == 0 if block_given? yield in_write else if not input.nil? if input.is_a?(Tempfile) or input.is_a?(IO) input.seek(0) in_write.write(input.read()) end if input.is_a?(String) in_write.write(input) end end end in_read.close() in_write.close() end Process.wait(pid) @stats << $? if $? != 0 e.seek(0) @error = "Process failed #{cmd}: #{e.read()}" e.close() e.unlink() closeTempfiles([r,w,e]) return false end end # Collect output w.seek(0) @out = w.read() w.close() w.unlink() closeTempfiles([r,w,e]) return true end
# File src/lib/nixadm/pipeline.rb, line 369 def tempfile_name() value = ''; 8.times{value << (65 + rand(25)).chr} value end