module Commands
Constants
- ELASTIC_MAPREDUCE_CLIENT_VERSION
Public Class Methods
add_commands(commands, opts)
click to toggle source
# File lib/commands.rb, line 1422 def self.add_commands(commands, opts) # FIXME: add --wait-for-step function commands.opts = opts step_commands = ["--jar", "--resize-jobflow", "--enable-debugging", "--hive-interactive", "--pig-interactive", "--hive-script", "--pig-script"] opts.separator "\n Creating Job Flows\n" commands.parse_command(CreateJobFlowCommand, "--create", "Create a new job flow") commands.parse_options(["--create"], [ [ OptionWithArg, "--name NAME", "The name of the job flow being created", :jobflow_name ], [ FlagOption, "--alive", "Create a job flow that stays running even though it has executed all its steps", :alive ], [ OptionWithArg, "--with-termination-protection", "Create a job with termination protection (default is no termination protection)", :with_termination_protection ], [ OptionWithArg, "--with-supported-products PRODUCTS", "Add supported products", :with_supported_products ], [ OptionWithArg, "--num-instances NUM", "Number of instances in the job flow", :instance_count ], [ OptionWithArg, "--slave-instance-type TYPE", "The type of the slave instances to launch", :slave_instance_type ], [ OptionWithArg, "--master-instance-type TYPE", "The type of the master instance to launch", :master_instance_type ], [ OptionWithArg, "--ami-version VERSION", "The version of ami to launch the job flow with", :ami_version ], [ OptionWithArg, "--key-pair KEY_PAIR", "The name of your Amazon EC2 Keypair", :key_pair ], [ OptionWithArg, "--availability-zone A_Z", "Specify the Availability Zone in which to launch the job flow", :az ], [ OptionWithArg, "--info INFO", "Specify additional info to job flow creation", :ainfo ], [ OptionWithArg, "--hadoop-version INFO", "Specify the Hadoop Version to install", :hadoop_version ], [ FlagOption, "--plain-output", "Return the job flow id from create step as simple text", :plain_output ], [ OptionWithArg, "--subnet EC2-SUBNET_ID", "Specify the VPC subnet that you want to run in", :subnet_id ], ]) commands.parse_command(CreateInstanceGroupCommand, "--instance-group ROLE", "Specify an instance group while creating a jobflow") commands.parse_options(["--instance-group", "--add-instance-group"], [ [OptionWithArg, "--bid-price PRICE", "The bid price for this instance group", :bid_price] ]) opts.separator "\n Passing arguments to steps\n" commands.parse_options(step_commands + ["--bootstrap-action", "--stream"], [ [ ArgsOption, "--args ARGS", "A command separated list of arguments to pass to the step" ], [ ArgOption, "--arg ARG", "An argument to pass to the step" ], [ OptionWithArg, "--step-name STEP_NAME", "Set name for the step", :step_name ], [ OptionWithArg, "--step-action STEP_ACTION", "Action to take when step finishes. One of CANCEL_AND_WAIT, TERMINATE_JOB_FLOW or CONTINUE", :step_action ], ]) opts.separator "\n Specific Steps\n" commands.parse_command(ResizeJobflowCommand, "--resize-jobflow", "Add a step to resize the job flow") commands.parse_command(EnableDebuggingCommand, "--enable-debugging", "Enable job flow debugging (you must be signed up to SimpleDB for this to work)") opts.separator "\n Adding Steps from a Json File to Job Flows\n" commands.parse_command(JsonStepCommand, "--json FILE", "Add a sequence of steps stored in the json file FILE") commands.parse_options(["--json"], [ [ ParamOption, "--param VARIABLE=VALUE ARGS", "Substitute the string VARIABLE with the string VALUE in the json file", :variables ], ]) opts.separator "\n Pig Steps\n" commands.parse_command(PigScriptCommand, "--pig-script [SCRIPT]", "Add a step that runs a Pig script") commands.parse_command(PigInteractiveCommand, "--pig-interactive", "Add a step that sets up the job flow for an interactive (via SSH) pig session") opts.separator "\n Hive Steps\n" commands.parse_command(HiveScriptCommand, "--hive-script [SCRIPT]", "Add a step that runs a Hive script") commands.parse_command(HiveInteractiveCommand, "--hive-interactive", "Add a step that sets up the job flow for an interactive (via SSH) hive session") commands.parse_command(HiveSiteCommand, "--hive-site HIVE_SITE", "Override Hive configuration with configuration from HIVE_SITE") commands.parse_options(["--hive-script", "--hive-interactive", "--hive-site"], [ [ OptionWithArg, "--hive-versions VERSIONS", "A comma separated list of Hive version", :hive_versions], [ OptionWithArg, "--step-action STEP_ACTION", "Action to take when step finishes. One of CANCEL_AND_WAIT, TERMINATE_JOB_FLOW or CONTINUE", :step_action ], ]) opts.separator "\n Adding Jar Steps to Job Flows\n" commands.parse_command(JarStepCommand, "--jar JAR", "Run a Hadoop Jar in a step") commands.parse_options(["--jar"], [ [ OptionWithArg, "--main-class MAIN_CLASS", "The main class of the jar", :main_class ] ]) opts.separator "\n Adding Streaming Steps to Job Flows\n" commands.parse_command(StreamStepCommand, "--stream", "Add a step that performs hadoop streaming") commands.parse_options(["--stream"], [ [ OptionWithArg, "--input INPUT", "Input to the steps, e.g. s3n://mybucket/input", :input], [ OptionWithArg, "--output OUTPUT", "The output to the steps, e.g. s3n://mybucket/output", :output], [ OptionWithArg, "--mapper MAPPER", "The mapper program or class", :mapper], [ OptionWithArg, "--cache CACHE_FILE", "A file to load into the cache, e.g. s3n://mybucket/sample.py#sample.py", :cache ], [ OptionWithArg, "--cache-archive CACHE_FILE", "A file to unpack into the cache, e.g. s3n://mybucket/sample.jar", :cache_archive, ], [ OptionWithArg, "--jobconf KEY=VALUE", "Specify jobconf arguments to pass to streaming, e.g. mapred.task.timeout=800000", :jobconf], [ OptionWithArg, "--reducer REDUCER", "The reducer program or class", :reducer], ]) opts.separator "\n Adding and Modifying Instance Groups\n" commands.parse_command(ModifyInstanceGroupCommand, "--modify-instance-group INSTANCE_GROUP", "Modify an existing instance group") commands.parse_command(AddInstanceGroupCommand, "--add-instance-group ROLE", "Add an instance group to an existing jobflow") commands.parse_command(UnarrestInstanceGroupCommand, "--unarrest-instance-group ROLE", "Unarrest an instance group of the supplied jobflow") commands.parse_options(["--instance-group", "--modify-instance-group", "--add-instance-group", "--create"], [ [ InstanceCountOption, "--instance-count INSTANCE_COUNT", "Set the instance count of an instance group", :instance_count ] ]) commands.parse_options(["--instance-group", "--add-instance-group", "--create"], [ [ InstanceTypeOption, "--instance-type INSTANCE_TYPE", "Set the instance type of an instance group", :instance_type ], ]) opts.separator "\n Contacting the Master Node\n" # commands.parse_options(["--ssh", "--scp", "--eip"], [ # [ FlagOption, "--no-wait", "Don't wait for the Master node to start before executing scp or ssh or assigning EIP", :no_wait ], # ]) commands.parse_command(SSHCommand, "--ssh [COMMAND]", "SSH to the master node and optionally run a command") commands.parse_command(PutCommand, "--put SRC", "Copy a file to the job flow using scp") commands.parse_command(GetCommand, "--get SRC", "Copy a file from the job flow using scp") commands.parse_command(PutCommand, "--scp SRC", "Copy a file to the job flow using scp") commands.parse_options(["--get", "--put", "--scp"], [ [ OptionWithArg, "--to DEST", "Destination location when copying files", :dest ], ]) commands.parse_command(LogsCommand, "--logs", "Display the step logs for the last executed step") opts.separator "\n Assigning Elastic IP to Master Node\n" commands.parse_command(EipCommand, "--eip [ElasticIP]", "Associate ElasticIP to master node. If no ElasticIP is specified, allocate and associate a new one.") opts.separator "\n Settings common to all step types\n" commands.parse_options(["--ssh", "--scp", "--eip"], [ [ FlagOption, "--no-wait", "Don't wait for the Master node to start before executing scp or ssh or assigning EIP", :no_wait ], [ GlobalOption, "--key-pair-file FILE_PATH", "Path to your local pem file for your EC2 key pair", :key_pair_file ], ]) opts.separator "\n Specifying Bootstrap Actions\n" commands.parse_command(BootstrapActionCommand, "--bootstrap-action SCRIPT", "Run a bootstrap action script on all instances") commands.parse_options(["--bootstrap-action"], [ [ OptionWithArg, "--bootstrap-name NAME", "Set the name of the bootstrap action", :bootstrap_name ], ]) opts.separator "\n Listing and Describing Job flows\n" commands.parse_command(ListActionCommand, "--list", "List all job flows created in the last 2 days") commands.parse_command(DescribeActionCommand, "--describe", "Dump a JSON description of the supplied job flows") commands.parse_command(PrintHiveVersionCommand, "--print-hive-version", "Prints the version of Hive that's currently active on the job flow") commands.parse_options(["--list", "--describe"], [ [ OptionWithArg, "--state NAME", "Set the name of the bootstrap action", :state ], [ FlagOption, "--active", "List running, starting or shutting down job flows", :active ], [ FlagOption, "--all", "List all job flows in the last 2 weeks", :all ], [ OptionWithArg, "--created-after=DATETIME", "List all jobflows created after DATETIME (xml date time format)", :created_after], [ OptionWithArg, "--created-before=DATETIME", "List all jobflows created before DATETIME (xml date time format)", :created_before], [ FlagOption, "--no-steps", "Do not list steps when listing jobs", :no_steps ], ]) opts.separator "\n Terminating Job Flows\n" commands.parse_command(SetTerminationProtection, "--set-termination-protection BOOL", "Enable or disable job flow termination protection. Either true or false") commands.parse_command(TerminateActionCommand, "--terminate", "Terminate job flows") opts.separator "\n Common Options\n" commands.parse_options(["--jobflow", "--describe"], [ [ GlobalOption, "--jobflow JOB_FLOW_ID", "The job flow to act on", :jobflow, /^j-[A-Z0-9]+$/], ]) commands.parse_options(:global, [ [ GlobalFlagOption, "--verbose", "Turn on verbose logging of program interaction", :verbose ], [ GlobalFlagOption, "--trace", "Trace commands made to the webservice", :trace ], [ GlobalOption, "--credentials CRED_FILE", "File containing access-id and private-key", :credentials], [ GlobalOption, "--access-id ACCESS_ID", "AWS Access Id", :aws_access_id], [ GlobalOption, "--private-key PRIVATE_KEY", "AWS Private Key", :aws_secret_key], [ GlobalOption, "--log-uri LOG_URI", "Location in S3 to store logs from the job flow, e.g. s3n://mybucket/logs", :log_uri ], ]) commands.parse_command(VersionCommand, "--version", "Print version string") commands.parse_command(HelpCommand, "--help", "Show help message") opts.separator "\n Uncommon Options\n" commands.parse_options(:global, [ [ GlobalFlagOption, "--debug", "Print stack traces when exceptions occur", :debug], [ GlobalOption, "--endpoint ENDPOINT", "File containing access-id and private-key", :endpoint], [ GlobalOption, "--region REGION", "The region to use for the endpoint", :region], [ GlobalOption, "--apps-path APPS_PATH", "Specify s3:// path to the base of the emr public bucket to use. e.g s3://us-east-1.elasticmapreduce", :apps_path], [ GlobalOption, "--beta-path BETA_PATH", "Specify s3:// path to the base of the emr public bucket to use for beta apps. e.g s3://beta.elasticmapreduce", :beta_path], ]) opts.separator "\n Short Options\n" commands.parse_command(HelpCommand, "-h", "Show help message") commands.parse_options(:global, [ [ GlobalFlagOption, "-v", "Turn on verbose logging of program interaction", :verbose ], [ GlobalOption, "-c CRED_FILE", "File containing access-id and private-key", :credentials ], [ GlobalOption, "-a ACCESS_ID", "AWS Access Id", :aws_access_id], [ GlobalOption, "-p PRIVATE_KEY", "AWS Private Key", :aws_secret_key], [ GlobalOption, "-j JOB_FLOW_ID", "The job flow to act on", :jobflow, /^j-[A-Z0-9]+$/], ]) end
create_and_execute_commands(args, client_class, logger, executor, exit_on_error=true)
click to toggle source
# File lib/commands.rb, line 1678 def self.create_and_execute_commands(args, client_class, logger, executor, exit_on_error=true) commands = Commands.new(logger, executor) begin opts = OptionParser.new do |opts| add_commands(commands, opts) end opts.parse!(args) if commands.get_field(:trace) then logger.level = :trace end commands.parse_jobflows(args) if commands.commands.size == 0 then commands.commands << HelpCommand.new("--help", "Print help text", nil, commands) end credentials = Credentials.new(commands) credentials.parse_credentials(commands.get_field(:credentials, "credentials.json"), commands.global_options) work_out_globals(commands) fold_commands(commands) commands.validate client = EmrClient.new(commands, logger, client_class) commands.enact(client) rescue RuntimeError => e logger.puts("Error: " + e.message) if commands.get_field(:trace) then logger.puts(e.backtrace.join("\n")) end if exit_on_error then exit(-1) else raise e end end return commands end
fold_commands(commands)
click to toggle source
this function pull out steps if there is a create command that preceeds them
# File lib/commands.rb, line 1624 def self.fold_commands(commands) last_create_command = nil new_commands = [] for cmd in commands do if cmd.is_a?(CreateJobFlowCommand) then last_create_command = cmd elsif is_create_child_command(cmd) then if last_create_command == nil then if cmd.is_a?(StepCommand) then last_create_command = AddJobFlowStepsCommand.new( "--add-steps", "Add job flow steps", nil, commands ) new_commands << last_create_command elsif cmd.is_a?(BootstrapActionCommand) then raise RuntimeError, "the option #{cmd.name} must come after the --create option" elsif cmd.is_a?(CreateInstanceGroupCommand) then raise RuntimeError, "the option #{cmd.name} must come after the --create option" elsif cmd.is_a?(AddInstanceGroupCommand) then new_commands << cmd next else next end end if cmd.is_a?(StepCommand) then if ! last_create_command.respond_to?(:add_step_command) then last_create_command = AddJobFlowStepsCommand.new( "--add-steps", "Add job flow steps", nil, commands ) end last_create_command.add_step_command(cmd) elsif cmd.is_a?(BootstrapActionCommand) then if ! last_create_command.respond_to?(:add_bootstrap_command) then raise RuntimeError, "Bootstrap actions must follow a --create command" end last_create_command.add_bootstrap_command(cmd) elsif cmd.is_a?(CreateInstanceGroupCommand) || cmd.is_a?(AddInstanceGroupCommand) then if last_create_command.respond_to?(:add_instance_group_command) then last_create_command.add_instance_group_command(cmd) else new_commands << cmd end else raise RuntimeError, "Unknown child command #{cmd.name} following #{last_create_command.name}" end next end new_commands << cmd end commands.commands = new_commands end
is_create_child_command(cmd)
click to toggle source
# File lib/commands.rb, line 1616 def self.is_create_child_command(cmd) return cmd.is_a?(StepCommand) || cmd.is_a?(BootstrapActionCommand) || cmd.is_a?(AddInstanceGroupCommand) || cmd.is_a?(CreateInstanceGroupCommand) end
work_out_globals(commands)
click to toggle source
# File lib/commands.rb, line 1720 def self.work_out_globals(commands) options = commands.global_options if commands.have(:region) then if commands.have(:endpoint) then raise RuntimeError, "You may not specify --region together with --endpoint" end endpoint = "https://#{options[:region]}.elasticmapreduce.amazonaws.com" commands.global_options[:endpoint] = endpoint end if commands.have(:endpoint) then region_match = commands.get_field(:endpoint).match("^https*://(.*)\.elasticmapreduce") if ! commands.have(:apps_path) && region_match != nil then options[:apps_path] = "s3://#{region_match[1]}.elasticmapreduce" end end options[:apps_path] ||= "s3://us-east-1.elasticmapreduce" options[:beta_path] ||= "s3://beta.elasticmapreduce" for key in [:apps_path, :beta_path] do options[key].chomp!("/") end end