class EC2::Platform::Base::Pipeline

Attributes

basename[R]
verbose[RW]

Public Class Methods

new(basename='pipeline', is_verbose=false) click to toggle source
# File lib/ec2/platform/base/pipeline.rb, line 71
def initialize(basename='pipeline', is_verbose=false)
  @stages = []
  @results = []
  @tempfiles = []
  @basename = basename
  @verbose = is_verbose
end

Public Instance Methods

add(name, command, success=0) click to toggle source
# File lib/ec2/platform/base/pipeline.rb, line 80
def add(name, command, success=0)
  @stages << Stage.new(name, command, success)
  self
end
cleanup() click to toggle source
# File lib/ec2/platform/base/pipeline.rb, line 148
def cleanup
  @tempfiles.each do |file| 
    file.close(true) if file.is_a? Tempfile
    FileUtils.rm_f(file.path) if File.exist?(file.path)
  end
end
command() click to toggle source
# File lib/ec2/platform/base/pipeline.rb, line 104
def command
  # Create the pipeline incantation
  pipeline = @stages.map { |s| s.command }.join(' | ') + '; '
  
  # Fudge pipeline incantation to make return codes for each
  # stage accessible from the associated pipeline stage
  pipestatus(pipeline)
end
concat(arr) click to toggle source
# File lib/ec2/platform/base/pipeline.rb, line 86
def concat(arr)
  if arr.is_a? Array
    arr.each do |e|
      self.add(e[0], e[1], e[2] || 0)
    end
  end
  self
end
create_tempfiles() click to toggle source
# File lib/ec2/platform/base/pipeline.rb, line 161
def create_tempfiles
  @tempfiles = (0...@stages.length).map do |index|
    file = Tempfile.new("#{@basename}-pipestatus-#{index}")
    file.close(false)
    file
  end
  unless @tempfiles.length == @stages.length
    raise ExecutionError.new(
      @basename, nil,
      "Temp files count(#{@tempfiles.length}) != stages count(#{@stages.length})")
  end
end
errors() click to toggle source
# File lib/ec2/platform/base/pipeline.rb, line 156
def errors
  @results.reject { |r| r.success }
end
execute() click to toggle source
# File lib/ec2/platform/base/pipeline.rb, line 114
def execute()
  @results = []
  create_tempfiles
  escaped_command = command.gsub("'","'\"'\"'")
  invocation = "/bin/bash -c '#{escaped_command}'"
  
  # Execute the pipeline invocation
  STDERR.puts("Pipeline.execute: command = [#{invocation}]") if verbose
  output = `#{invocation}`
  STDERR.puts("Pipeline.execute: output = [#{output.strip}]") if verbose
    
  unless $CHILD_STATUS.success?
    raise ExecutionError.new(@basename)
  end
    
  # Collect the pipeline's exit codes and see if they're good
  successful = true
  offender = nil
  @results = @tempfiles.zip(@stages).map do |file, stage|
    file.open()
    status = file.read().strip.to_i
    file.close(false)
    success = (stage.success == status)
    successful &&= success
    offender = stage.name unless successful
    Stage::Result.new(stage.name, status, success)
  end
  unless successful
    raise ExecutionError.new(@basename, offender)
  end
  output
end
pipestatus(cmd) click to toggle source
# File lib/ec2/platform/base/pipeline.rb, line 99
def pipestatus(cmd)
  raise 'unimplemented method'
end
to_s() click to toggle source
# File lib/ec2/platform/base/pipeline.rb, line 175
def to_s
  "Pipeline(stages=[#{@stages.join(', ')}], results=[#{@results.join(', ')}])"
end