class Commands::StreamStepCommand

Constants

GENERIC_OPTIONS

Attributes

args[RW]
cache[RW]
cache_archive[RW]
input[RW]
jobconf[RW]
mapper[RW]
output[RW]
reducer[RW]

Public Class Methods

new(*args) click to toggle source
Calls superclass method Commands::StepCommand::new
# File lib/commands.rb, line 517
def initialize(*args)
  super(*args)
  @jobconf = []
end

Public Instance Methods

sort_streaming_args(streaming_args) click to toggle source
# File lib/commands.rb, line 555
def sort_streaming_args(streaming_args)
  sorted_streaming_args = []
  i=0
  while streaming_args && i < streaming_args.length
    if GENERIC_OPTIONS.include?(streaming_args[i]) then
      if i+1 < streaming_args.length
        sorted_streaming_args.unshift(streaming_args[i+1])
        sorted_streaming_args.unshift(streaming_args[i])
        i=i+2
      else
        raise RuntimeError, "Missing value for argument #{streaming_args[i]}"
      end
    else
      sorted_streaming_args << streaming_args[i]
      i=i+1
    end
  end
  return sorted_streaming_args
end
steps() click to toggle source
# File lib/commands.rb, line 522
def steps
  timestr = Time.now.strftime("%Y-%m-%dT%H%M%S")
  stream_options = []
  for ca in get_field(:cache, []) do
    stream_options << "-cacheFile" << ca
  end
  
  for ca in get_field(:cache_archive, []) do
    stream_options << "-cacheArchive" << ca
  end
  
  for jc in get_field(:jobconf, []) do
    stream_options << "-jobconf" << jc
  end

  # Note that the streaming options should go before command options for
  # Hadoop 0.20
  step = {
    "Name"            => get_field(:step_name, "Example Streaming Step"),
    "ActionOnFailure" => get_field(:step_action, "CANCEL_AND_WAIT"),
    "HadoopJarStep"   => {
      "Jar" => "/home/hadoop/contrib/streaming/hadoop-streaming.jar",
      "Args" => (sort_streaming_args(get_field(:args))) + (stream_options) + [
        "-input",     get_field(:input, "s3n://elasticmapreduce/samples/wordcount/input"),
        "-output",    get_field(:output, "hdfs:///examples/output/#{timestr}"),
        "-mapper",    get_field(:mapper, "s3n://elasticmapreduce/samples/wordcount/wordSplitter.py"),
        "-reducer",   get_field(:reducer, "aggregate")
      ]
    }
  }
  return [ step ]
end