class Commands::StreamStepCommand
Constants
- GENERIC_OPTIONS
Attributes
args[RW]
cache[RW]
cache_archive[RW]
input[RW]
jobconf[RW]
mapper[RW]
output[RW]
reducer[RW]
Public Class Methods
new(*args)
click to toggle source
Calls superclass method
Commands::StepCommand::new
# File lib/commands.rb, line 517 def initialize(*args) super(*args) @jobconf = [] end
Public Instance Methods
sort_streaming_args(streaming_args)
click to toggle source
# File lib/commands.rb, line 555 def sort_streaming_args(streaming_args) sorted_streaming_args = [] i=0 while streaming_args && i < streaming_args.length if GENERIC_OPTIONS.include?(streaming_args[i]) then if i+1 < streaming_args.length sorted_streaming_args.unshift(streaming_args[i+1]) sorted_streaming_args.unshift(streaming_args[i]) i=i+2 else raise RuntimeError, "Missing value for argument #{streaming_args[i]}" end else sorted_streaming_args << streaming_args[i] i=i+1 end end return sorted_streaming_args end
steps()
click to toggle source
# File lib/commands.rb, line 522 def steps timestr = Time.now.strftime("%Y-%m-%dT%H%M%S") stream_options = [] for ca in get_field(:cache, []) do stream_options << "-cacheFile" << ca end for ca in get_field(:cache_archive, []) do stream_options << "-cacheArchive" << ca end for jc in get_field(:jobconf, []) do stream_options << "-jobconf" << jc end # Note that the streaming options should go before command options for # Hadoop 0.20 step = { "Name" => get_field(:step_name, "Example Streaming Step"), "ActionOnFailure" => get_field(:step_action, "CANCEL_AND_WAIT"), "HadoopJarStep" => { "Jar" => "/home/hadoop/contrib/streaming/hadoop-streaming.jar", "Args" => (sort_streaming_args(get_field(:args))) + (stream_options) + [ "-input", get_field(:input, "s3n://elasticmapreduce/samples/wordcount/input"), "-output", get_field(:output, "hdfs:///examples/output/#{timestr}"), "-mapper", get_field(:mapper, "s3n://elasticmapreduce/samples/wordcount/wordSplitter.py"), "-reducer", get_field(:reducer, "aggregate") ] } } return [ step ] end