module Commands

Constants

ELASTIC_MAPREDUCE_CLIENT_VERSION

Public Class Methods

add_commands(commands, opts) click to toggle source
# File lib/commands.rb, line 1422
  def self.add_commands(commands, opts)
    # FIXME: add --wait-for-step function

    commands.opts = opts

    step_commands = ["--jar", "--resize-jobflow", "--enable-debugging", "--hive-interactive", "--pig-interactive", "--hive-script", "--pig-script"]

    opts.separator "\n  Creating Job Flows\n"

    commands.parse_command(CreateJobFlowCommand, "--create", "Create a new job flow")
    commands.parse_options(["--create"], [
      [ OptionWithArg, "--name NAME",                 "The name of the job flow being created", :jobflow_name ],
      [ FlagOption,    "--alive",                     "Create a job flow that stays running even though it has executed all its steps", :alive ],
      [ OptionWithArg, "--with-termination-protection",   "Create a job with termination protection (default is no termination protection)", :with_termination_protection ],
      [ OptionWithArg, "--with-supported-products PRODUCTS",   "Add supported products", :with_supported_products ],
      [ OptionWithArg, "--num-instances NUM",         "Number of instances in the job flow", :instance_count ],
      [ OptionWithArg, "--slave-instance-type TYPE",  "The type of the slave instances to launch", :slave_instance_type ],
      [ OptionWithArg, "--master-instance-type TYPE", "The type of the master instance to launch", :master_instance_type ],
      [ OptionWithArg, "--ami-version VERSION",       "The version of ami to launch the job flow with", :ami_version ],
      [ OptionWithArg, "--key-pair KEY_PAIR",         "The name of your Amazon EC2 Keypair", :key_pair ], 
      [ OptionWithArg, "--availability-zone A_Z",     "Specify the Availability Zone in which to launch the job flow", :az ],
      [ OptionWithArg, "--info INFO",                 "Specify additional info to job flow creation", :ainfo ],
      [ OptionWithArg, "--hadoop-version INFO",       "Specify the Hadoop Version to install", :hadoop_version ],
      [ FlagOption,    "--plain-output",              "Return the job flow id from create step as simple text", :plain_output ],
      [ OptionWithArg, "--subnet EC2-SUBNET_ID",      "Specify the VPC subnet that you want to run in", :subnet_id ],
    ])
    commands.parse_command(CreateInstanceGroupCommand, "--instance-group ROLE", "Specify an instance group while creating a jobflow")
    commands.parse_options(["--instance-group", "--add-instance-group"], [
      [OptionWithArg, "--bid-price PRICE",        "The bid price for this instance group", :bid_price]
    ])

    opts.separator "\n  Passing arguments to steps\n"
    
    commands.parse_options(step_commands + ["--bootstrap-action", "--stream"], [
      [ ArgsOption,    "--args ARGS",                 "A command separated list of arguments to pass to the step" ],
      [ ArgOption,     "--arg ARG",                   "An argument to pass to the step" ],
      [ OptionWithArg, "--step-name STEP_NAME",       "Set name for the step", :step_name ],
      [ OptionWithArg, "--step-action STEP_ACTION",   "Action to take when step finishes. One of CANCEL_AND_WAIT, TERMINATE_JOB_FLOW or CONTINUE", :step_action ], 
    ])

    opts.separator "\n  Specific Steps\n"

    commands.parse_command(ResizeJobflowCommand, "--resize-jobflow",     "Add a step to resize the job flow")
    commands.parse_command(EnableDebuggingCommand, "--enable-debugging", "Enable job flow debugging (you must be signed up to SimpleDB for this to work)")

    opts.separator "\n  Adding Steps from a Json File to Job Flows\n"

    commands.parse_command(JsonStepCommand, "--json FILE", "Add a sequence of steps stored in the json file FILE")
    commands.parse_options(["--json"], [
      [ ParamOption, "--param VARIABLE=VALUE ARGS", "Substitute the string VARIABLE with the string VALUE in the json file", :variables ],
    ])

    opts.separator "\n  Pig Steps\n"

    commands.parse_command(PigScriptCommand,      "--pig-script [SCRIPT]",  "Add a step that runs a Pig script")
    commands.parse_command(PigInteractiveCommand, "--pig-interactive",  "Add a step that sets up the job flow for an interactive (via SSH) pig session")

    opts.separator "\n  Hive Steps\n"

    commands.parse_command(HiveScriptCommand, "--hive-script [SCRIPT]",      "Add a step that runs a Hive script")
    commands.parse_command(HiveInteractiveCommand, "--hive-interactive", "Add a step that sets up the job flow for an interactive (via SSH) hive session")
    commands.parse_command(HiveSiteCommand, "--hive-site HIVE_SITE", "Override Hive configuration with configuration from HIVE_SITE")
    commands.parse_options(["--hive-script", "--hive-interactive", "--hive-site"], [
      [ OptionWithArg,     "--hive-versions VERSIONS", "A comma separated list of Hive version", :hive_versions],
      [ OptionWithArg, "--step-action STEP_ACTION", "Action to take when step finishes. One of CANCEL_AND_WAIT, TERMINATE_JOB_FLOW or CONTINUE", :step_action ],
    ])
    
    opts.separator "\n  Adding Jar Steps to Job Flows\n"

    commands.parse_command(JarStepCommand, "--jar JAR", "Run a Hadoop Jar in a step")
    commands.parse_options(["--jar"], [
      [ OptionWithArg, "--main-class MAIN_CLASS",  "The main class of the jar", :main_class ]
    ])

    opts.separator "\n  Adding Streaming Steps to Job Flows\n"

    commands.parse_command(StreamStepCommand, "--stream", "Add a step that performs hadoop streaming")
    commands.parse_options(["--stream"], [
      [ OptionWithArg, "--input INPUT",               "Input to the steps, e.g. s3n://mybucket/input", :input],
      [ OptionWithArg, "--output OUTPUT",             "The output to the steps, e.g. s3n://mybucket/output", :output],
      [ OptionWithArg, "--mapper MAPPER",             "The mapper program or class", :mapper],
      [ OptionWithArg, "--cache CACHE_FILE",          "A file to load into the cache, e.g. s3n://mybucket/sample.py#sample.py", :cache ],
      [ OptionWithArg, "--cache-archive CACHE_FILE",  "A file to unpack into the cache, e.g. s3n://mybucket/sample.jar", :cache_archive, ],
      [ OptionWithArg, "--jobconf KEY=VALUE",         "Specify jobconf arguments to pass to streaming, e.g. mapred.task.timeout=800000", :jobconf],
      [ OptionWithArg, "--reducer REDUCER",           "The reducer program or class", :reducer],
    ])

    opts.separator "\n  Adding and Modifying Instance Groups\n"

    commands.parse_command(ModifyInstanceGroupCommand, "--modify-instance-group INSTANCE_GROUP", "Modify an existing instance group")
    commands.parse_command(AddInstanceGroupCommand,    "--add-instance-group ROLE", "Add an instance group to an existing jobflow")
    commands.parse_command(UnarrestInstanceGroupCommand, "--unarrest-instance-group ROLE", "Unarrest an instance group of the supplied jobflow")
    commands.parse_options(["--instance-group", "--modify-instance-group", "--add-instance-group", "--create"], [
     [ InstanceCountOption, "--instance-count INSTANCE_COUNT", "Set the instance count of an instance group", :instance_count ]
    ])
    commands.parse_options(["--instance-group", "--add-instance-group", "--create"], [
     [ InstanceTypeOption,  "--instance-type INSTANCE_TYPE", "Set the instance type of an instance group", :instance_type ],
    ])
         
    opts.separator "\n  Contacting the Master Node\n"

#    commands.parse_options(["--ssh", "--scp", "--eip"], [
#      [ FlagOption,    "--no-wait",    "Don't wait for the Master node to start before executing scp or ssh or assigning EIP", :no_wait ],
#    ])

    commands.parse_command(SSHCommand, "--ssh [COMMAND]", "SSH to the master node and optionally run a command")
    commands.parse_command(PutCommand, "--put SRC", "Copy a file to the job flow using scp")
    commands.parse_command(GetCommand, "--get SRC", "Copy a file from the job flow using scp")
    commands.parse_command(PutCommand, "--scp SRC", "Copy a file to the job flow using scp")

    commands.parse_options(["--get", "--put", "--scp"], [
      [ OptionWithArg, "--to DEST",    "Destination location when copying files", :dest ],
    ])

    commands.parse_command(LogsCommand, "--logs", "Display the step logs for the last executed step")

    opts.separator "\n  Assigning Elastic IP to Master Node\n"

    commands.parse_command(EipCommand, "--eip [ElasticIP]", "Associate ElasticIP to master node. If no ElasticIP is specified, allocate and associate a new one.")

    opts.separator "\n  Settings common to all step types\n"

    commands.parse_options(["--ssh", "--scp", "--eip"], [
      [ FlagOption,   "--no-wait",    "Don't wait for the Master node to start before executing scp or ssh or assigning EIP", :no_wait ],
      [ GlobalOption, "--key-pair-file FILE_PATH",   "Path to your local pem file for your EC2 key pair", :key_pair_file ], 
    ])

    opts.separator "\n  Specifying Bootstrap Actions\n"

    commands.parse_command(BootstrapActionCommand, "--bootstrap-action SCRIPT", "Run a bootstrap action script on all instances")
    commands.parse_options(["--bootstrap-action"], [
      [ OptionWithArg, "--bootstrap-name NAME",    "Set the name of the bootstrap action", :bootstrap_name ],
    ])
   

    opts.separator "\n  Listing and Describing Job flows\n"

    commands.parse_command(ListActionCommand, "--list", "List all job flows created in the last 2 days")
    commands.parse_command(DescribeActionCommand, "--describe", "Dump a JSON description of the supplied job flows")
    commands.parse_command(PrintHiveVersionCommand, "--print-hive-version", "Prints the version of Hive that's currently active on the job flow")
    commands.parse_options(["--list", "--describe"], [
      [ OptionWithArg, "--state NAME",   "Set the name of the bootstrap action", :state ],
      [ FlagOption,    "--active",       "List running, starting or shutting down job flows", :active ],
      [ FlagOption,    "--all",          "List all job flows in the last 2 weeks", :all ],
      [ OptionWithArg,    "--created-after=DATETIME", "List all jobflows created after DATETIME (xml date time format)", :created_after],
      [ OptionWithArg,    "--created-before=DATETIME", "List all jobflows created before DATETIME (xml date time format)", :created_before],
      [ FlagOption,    "--no-steps",     "Do not list steps when listing jobs", :no_steps ],
    ])
    
    opts.separator "\n  Terminating Job Flows\n"
            
    commands.parse_command(SetTerminationProtection, "--set-termination-protection BOOL", "Enable or disable job flow termination protection. Either true or false")

    commands.parse_command(TerminateActionCommand, "--terminate", "Terminate job flows")

    opts.separator "\n  Common Options\n"
    
    commands.parse_options(["--jobflow", "--describe"], [
      [ GlobalOption, "--jobflow JOB_FLOW_ID",  "The job flow to act on", :jobflow, /^j-[A-Z0-9]+$/],
    ])

    commands.parse_options(:global, [
      [ GlobalFlagOption, "--verbose",  "Turn on verbose logging of program interaction", :verbose ],
      [ GlobalFlagOption, "--trace",    "Trace commands made to the webservice", :trace ],
      [ GlobalOption, "--credentials CRED_FILE",  "File containing access-id and private-key", :credentials],
      [ GlobalOption, "--access-id ACCESS_ID",  "AWS Access Id", :aws_access_id],
      [ GlobalOption, "--private-key PRIVATE_KEY",  "AWS Private Key", :aws_secret_key],
      [ GlobalOption, "--log-uri LOG_URI",  "Location in S3 to store logs from the job flow, e.g. s3n://mybucket/logs", :log_uri ],
    ])
    commands.parse_command(VersionCommand, "--version", "Print version string")
    commands.parse_command(HelpCommand, "--help", "Show help message")

    opts.separator "\n  Uncommon Options\n"

    commands.parse_options(:global, [
      [ GlobalFlagOption, "--debug",  "Print stack traces when exceptions occur", :debug],
      [ GlobalOption,     "--endpoint ENDPOINT",  "File containing access-id and private-key", :endpoint],
      [ GlobalOption,     "--region REGION",  "The region to use for the endpoint", :region],
      [ GlobalOption,     "--apps-path APPS_PATH",  "Specify s3:// path to the base of the emr public bucket to use. e.g s3://us-east-1.elasticmapreduce", :apps_path],
      [ GlobalOption,     "--beta-path BETA_PATH",  "Specify s3:// path to the base of the emr public bucket to use for beta apps. e.g s3://beta.elasticmapreduce", :beta_path],
    ])
 
    opts.separator "\n  Short Options\n"
    commands.parse_command(HelpCommand, "-h", "Show help message")
    commands.parse_options(:global, [
      [ GlobalFlagOption, "-v", "Turn on verbose logging of program interaction", :verbose ],
      [ GlobalOption, "-c CRED_FILE",  "File containing access-id and private-key", :credentials ],
      [ GlobalOption, "-a ACCESS_ID",  "AWS Access Id", :aws_access_id],
      [ GlobalOption, "-p PRIVATE_KEY",  "AWS Private Key", :aws_secret_key],
      [ GlobalOption, "-j JOB_FLOW_ID",  "The job flow to act on", :jobflow, /^j-[A-Z0-9]+$/],
    ])

  end
create_and_execute_commands(args, client_class, logger, executor, exit_on_error=true) click to toggle source
# File lib/commands.rb, line 1678
def self.create_and_execute_commands(args, client_class, logger, executor, exit_on_error=true)
  commands = Commands.new(logger, executor)
  
  begin
    opts = OptionParser.new do |opts|
      add_commands(commands, opts)
    end
    opts.parse!(args)

    if commands.get_field(:trace) then
      logger.level = :trace
    end
    
    commands.parse_jobflows(args)

    if commands.commands.size == 0 then
      commands.commands << HelpCommand.new("--help", "Print help text", nil, commands)
    end
    
    credentials = Credentials.new(commands)
    credentials.parse_credentials(commands.get_field(:credentials, "credentials.json"), 
                                  commands.global_options)
    
    work_out_globals(commands)
    fold_commands(commands)
    commands.validate
    client = EmrClient.new(commands, logger, client_class)
    commands.enact(client)
  rescue RuntimeError => e
    logger.puts("Error: " + e.message)
    if commands.get_field(:trace) then
      logger.puts(e.backtrace.join("\n"))
    end
    if exit_on_error then
      exit(-1)
    else
      raise e
    end
  end
  return commands
end
fold_commands(commands) click to toggle source

this function pull out steps if there is a create command that preceeds them

# File lib/commands.rb, line 1624
def self.fold_commands(commands)
  last_create_command = nil
  new_commands = []
  for cmd in commands do
    if cmd.is_a?(CreateJobFlowCommand) then
      last_create_command = cmd
    elsif is_create_child_command(cmd) then
      if last_create_command == nil then
        if cmd.is_a?(StepCommand) then
          last_create_command = AddJobFlowStepsCommand.new(
            "--add-steps", "Add job flow steps", nil, commands
          )
          new_commands << last_create_command
        elsif cmd.is_a?(BootstrapActionCommand) then
          raise RuntimeError, "the option #{cmd.name} must come after the --create option"
        elsif cmd.is_a?(CreateInstanceGroupCommand) then
          raise RuntimeError, "the option #{cmd.name} must come after the --create option"
        elsif cmd.is_a?(AddInstanceGroupCommand) then
          new_commands << cmd
          next
        else
          next
        end
      end
      
      if cmd.is_a?(StepCommand) then
        if ! last_create_command.respond_to?(:add_step_command) then
          last_create_command = AddJobFlowStepsCommand.new(
            "--add-steps", "Add job flow steps", nil, commands
          )
        end
        last_create_command.add_step_command(cmd)
      elsif cmd.is_a?(BootstrapActionCommand) then
        if ! last_create_command.respond_to?(:add_bootstrap_command) then
          raise RuntimeError, "Bootstrap actions must follow a --create command"
        end
        last_create_command.add_bootstrap_command(cmd)
      elsif cmd.is_a?(CreateInstanceGroupCommand) || cmd.is_a?(AddInstanceGroupCommand) then
        if last_create_command.respond_to?(:add_instance_group_command) then
          last_create_command.add_instance_group_command(cmd)
        else
          new_commands << cmd
        end
      else
        raise RuntimeError, "Unknown child command #{cmd.name} following #{last_create_command.name}"
      end
      next
    end
    new_commands << cmd
  end

  commands.commands = new_commands
end
is_create_child_command(cmd) click to toggle source
# File lib/commands.rb, line 1616
def self.is_create_child_command(cmd)
  return cmd.is_a?(StepCommand) || 
    cmd.is_a?(BootstrapActionCommand) || 
    cmd.is_a?(AddInstanceGroupCommand) ||
    cmd.is_a?(CreateInstanceGroupCommand)
end
work_out_globals(commands) click to toggle source
# File lib/commands.rb, line 1720
def self.work_out_globals(commands)
  options = commands.global_options
  if commands.have(:region) then
    if commands.have(:endpoint) then
      raise RuntimeError, "You may not specify --region together with --endpoint"
    end

    endpoint = "https://#{options[:region]}.elasticmapreduce.amazonaws.com"
    commands.global_options[:endpoint] = endpoint
  end

  if commands.have(:endpoint) then
    region_match = commands.get_field(:endpoint).match("^https*://(.*)\.elasticmapreduce")
    if ! commands.have(:apps_path) && region_match != nil then
      options[:apps_path] = "s3://#{region_match[1]}.elasticmapreduce"
    end
  end

  options[:apps_path] ||= "s3://us-east-1.elasticmapreduce"
  options[:beta_path] ||= "s3://beta.elasticmapreduce"
  for key in [:apps_path, :beta_path] do
    options[key].chomp!("/")
  end
end