class NixAdm::Pipeline

This class represents a UNIX pipeline. It provides two different implementations: a classic version which works exacly like the shell (Pipeline#classic()) and a conservative version that ensures all commands in the pipeline must succeed in order to continue (Pipeline#strict()). Ultimately both implementations work using the Process.spawn() method. They differ in how data flows between processes and how the pipeline behaves in the event of error.

Examples:

sys = NixAdm::Pipeline.new()
sys.run('ssh jan zfs list -t snapshot')
puts sys.out
puts sys.stats

# Strict mode:
sys.mode = :strict
sys.run('ssh jan zfs list -t snapshot')
puts sys.out

# Tempfile
file = Tempfile.new('test')
file.write('tempfile')
sys.run([ './one.rb', './two.rb' ], file)
puts pipeline.out

# OS File
file = File.open('/tmp/data.txt')
pipeline.run([ './one.rb', './two.rb' ], file)
puts pipeline.out

Attributes

debug[RW]
error[R]
mode[RW]
opts[RW]
out[R]
stats[R]
testing[R]
throw_on_fail[RW]

Public Class Methods

new() click to toggle source
# File src/lib/nixadm/pipeline.rb, line 126
def initialize()
  @opts  = {}
  @stats = []
  @mode  = :classic
  @debug = false
  @error = nil
  @throw_on_fail = true
end

Public Instance Methods

classic(args, input=nil) { |stdin, stdout| ... } click to toggle source

Runs one or more commands sequentially in a class UNIX pipeline. This is a true UNIX pipeline. It starts each command in parallel and data flows through each using UNIX pipes. If a command fails, the pipeline will continue until the last command exists.

@param commands A string containing a single command or an array of multiple commands.

@param input Standard in to the first process in pipeline. This can be the a string or IO object.

@return Returns true if all commands in pipeline completed successfully, false otherwise. Standard out of the last command it stored in the @out member while standard error is stored in @error. Process status info for each command is stored in @stats.

# File src/lib/nixadm/pipeline.rb, line 166
def classic(args, input=nil)

  if args.is_a? String
    args = [ args ]
  end

  if @debug
    $stderr.puts args
  end

  stderr, ewrite = IO.pipe
  args << { :err=>ewrite }

  Open3.pipeline_rw(*args) do |stdin, stdout, threads|
    if block_given?
      yield stdin, stdout
    else
      if not input.nil?
        if input.is_a?(Tempfile) or input.is_a?(IO)
          input.seek(0)
          stdin.write(input.read())
        end

        if input.is_a?(String)
          stdin.write(input)
        end
      end

      stdin.close()

      # Collect stdout
      @out = ''
      while true
        line = stdout.gets
        break if line.nil?
        @out += line
      end

    end

    threads.each do |t|
      @stats << t.value
    end
  end

  ewrite.close()

  # Collect stderr
  @error = ''
  while true
    line = stderr.gets
    break if line.nil?
    @error += line
  end

  result = @stats.all? { |s| s.exitstatus == 0 }

  if result == false and @throw_on_fail
    # Assemble command-line representation of pipeline
    cmd_line = []
    args.each do |e|
      if e.class == String
        cmd_line << e
      elsif e.class == Array
        cmd_line << e.join(' ')
      end
    end

    raise "Command failed: #{cmd_line.join(' | ')}\n #{error()}"
  end

  return result
end
closeTempfiles(files) click to toggle source
# File src/lib/nixadm/pipeline.rb, line 375
def closeTempfiles(files)
  files.each do |file|
    if not file.nil? and file.is_a?(Tempfile)
      file.close()
      file.unlink()
    end
  end
end
run(commands, input=nil, &block) click to toggle source

Run a UNIX pipeline. This selects between the two pipeline implementations: sequential or parallel depending on the @mode value. If @mode is :classic, it runs the classic() implementation, otherwise it runs the strict() implementation. Ultimately both implementations work using the Process.spawn() method. They differ in how data flows between processes and how the pipeline behaves in the event of error.

# File src/lib/nixadm/pipeline.rb, line 142
def run(commands, input=nil, &block)
  if @mode == :classic
    return classic(commands, input, &block)
  else
    return strict(commands, input, &block)
  end
end
strict(commands, input=nil) { |in_write| ... } click to toggle source

Runs one or more commands sequentially in a pipeline. This is not a true UNIX pipeline in that it's more conservative. It runs each command fully to exit and ensures that it completed successfully (with 0 exit status). If a command fails, it stops the pipeline and return false. Use this form when you need to ensure that every command in the pipeline must succeed before calling the next.

Rather than using UNIX pipes to connect commands, this uses temporary files. The standard out of each process is piped to a temporary file which in turn is used as the standard in for the next command.

@param commands A string containing a single command or an array of multiple commands.

@param input Standard in to the first process in pipeline. This can be the a string or IO object.

@return Returns true if all commands in pipeline completed successfully, false otherwise. Standard out of the last command it stored in the @out member while standard error is stored in @error. Process status info for each command is stored in @stats.

# File src/lib/nixadm/pipeline.rb, line 262
def strict(commands, input=nil)

  opts_base = @opts.dup
  in_read, in_write = IO.pipe()

  r = nil
  w = nil
  e = nil

  if commands.is_a? String
    commands = [ commands ]
  end

  commands.each_with_index do |cmd, i|
    cmd_opts = opts_base.dup

    if String === cmd
      cmd = [cmd]
    else
      cmd_opts.update cmd.pop if Hash === cmd.last
    end

    # Read stream
    if i == 0
      r = in_read
    else
      if r != in_read
        # Close previous tempfile
        r.close()
        r.unlink() if r.is_a?(Tempfile)
      end

      r = w

      if r.is_a?(Tempfile)
        r.seek(0)
      end
    end

    # Write stream
    w = Tempfile.new(tempfile_name())

    # Close previous stderr tempfile
    if not e.nil?
      e.close()
      e.unlink()
    end

    e = Tempfile.new(tempfile_name())

    cmd_opts[:in]  = r
    cmd_opts[:out] = w
    cmd_opts[:err] = e

    if @debug
      $stderr.puts cmd
    end

    pid = Process.spawn(*cmd, cmd_opts)

    if i == 0
      if block_given?
        yield in_write
      else
        if not input.nil?
          if input.is_a?(Tempfile) or input.is_a?(IO)
            input.seek(0)
            in_write.write(input.read())
          end

          if input.is_a?(String)
            in_write.write(input)
          end
        end
      end

      in_read.close()
      in_write.close()
    end

    Process.wait(pid)

    @stats << $?

    if $? != 0
      e.seek(0)
      @error = "Process failed #{cmd}: #{e.read()}"
      e.close()
      e.unlink()

      closeTempfiles([r,w,e])

      return false
    end
  end

  # Collect output
  w.seek(0)
  @out = w.read()
  w.close()
  w.unlink()

  closeTempfiles([r,w,e])

  return true
end
tempfile_name() click to toggle source
# File src/lib/nixadm/pipeline.rb, line 369
def tempfile_name()
  value = '';
  8.times{value << (65 + rand(25)).chr}
  value
end