class Commands::CreateJobFlowCommand

Constants

OLD_OPTIONS

Attributes

ainfo[RW]
alive[RW]
ami_version[RW]
az[RW]
bootstrap_commands[RW]
hadoop_version[RW]
instance_count[RW]
instance_group_commands[RW]
instance_type[RW]
jobflow_name[RW]
key_pair[RW]
key_pair_file[RW]
log_uri[RW]
master_instance_type[RW]
plain_output[RW]
slave_instance_type[RW]
subnet_id[RW]
with_supported_products[RW]
with_termination_protection[RW]

Public Class Methods

new(*args) click to toggle source
Calls superclass method Commands::StepProcessingCommand::new
# File lib/commands.rb, line 799
def initialize(*args)
  super(*args)
  @instance_group_commands = []
  @bootstrap_commands = []
end

Public Instance Methods

add_bootstrap_command(bootstrap_command) click to toggle source
# File lib/commands.rb, line 809
def add_bootstrap_command(bootstrap_command)
  @bootstrap_commands << bootstrap_command
end
add_instance_group_command(instance_group_command) click to toggle source
# File lib/commands.rb, line 813
def add_instance_group_command(instance_group_command)
  @instance_group_commands << instance_group_command
end
add_step_command(step) click to toggle source
# File lib/commands.rb, line 805
def add_step_command(step)
  @step_commands << step
end
apply_jobflow_option(field_symbol, *keys) click to toggle source
# File lib/commands.rb, line 867
def apply_jobflow_option(field_symbol, *keys)
  value = get_field(field_symbol)
  if value != nil then 
    map = @jobflow
    for key in keys[0..-2] do
      nmap = map[key]
      if nmap == nil then
        map[key] = {}
        nmap = map[key]
      end
      map = nmap
    end
    map[keys.last] = value
  end
end
create_jobflow() click to toggle source
# File lib/commands.rb, line 921
def create_jobflow
  @jobflow = {
    "Name"   => get_field(:jobflow_name, default_job_flow_name),
    "Instances" => {
      "KeepJobFlowAliveWhenNoSteps" => (get_field(:alive) ? "true" : "false"),
      "TerminationProtected"        => (get_field(:with_termination_protection) ? "true" : "false"),
      "InstanceGroups" => []
    },
    "Steps" => [],
    "BootstrapActions" => []
  }
  products_string = get_field(:with_supported_products)
  if products_string then
    products = products_string.split(/,/).map { |s| s.strip }
    @jobflow["SupportedProducts"] = products
  end
  @jobflow
end
default_hadoop_version() click to toggle source

FIXME: add code to setup collapse instance group commands

# File lib/commands.rb, line 791
def default_hadoop_version
  if get_field(:ami_version) == "1.0" then
    "0.20"
  else
    "0.20.205"
  end
end
default_job_flow_name() click to toggle source
# File lib/commands.rb, line 940
def default_job_flow_name
  name = "Development Job Flow"
  if get_field(:alive) then
    name += " (requires manual termination)"
  end
  return name
end
enact(client) click to toggle source
# File lib/commands.rb, line 830
def enact(client)
  @jobflow = create_jobflow

  apply_jobflow_option(:ainfo, "AdditionalInfo")
  apply_jobflow_option(:key_pair, "Instances", "Ec2KeyName")
  apply_jobflow_option(:hadoop_version, "Instances", "HadoopVersion")
  apply_jobflow_option(:az, "Instances", "Placement", "AvailabilityZone")
  apply_jobflow_option(:log_uri, "LogUri")
  apply_jobflow_option(:ami_version, "AmiVersion")
  apply_jobflow_option(:subnet_id, "Instances", "Ec2SubnetId")
 
  @jobflow["AmiVersion"] ||= "latest"

  self.step_commands = reorder_steps(@jobflow, self.step_commands)
  @jobflow["Steps"] = step_commands.map { |x| x.steps }.flatten

  setup_instance_groups
  @jobflow["Instances"]["InstanceGroups"] = instance_group_commands.map { |x| x.instance_group }

  bootstrap_action_index = 1
  for bootstrap_action_command in bootstrap_commands do
    @jobflow["BootstrapActions"] << bootstrap_action_command.bootstrap_action(
      bootstrap_action_index)
    bootstrap_action_index += 1
  end

  run_result = client.run_jobflow(@jobflow)
  jobflow_id = run_result['JobFlowId']
  commands.global_options[:jobflow] << jobflow_id 

  if have(:plain_output) then
    logger.puts jobflow_id
  else
    logger.puts "Created job flow " + jobflow_id
  end
end
have_role(instance_group_commands, role) click to toggle source
# File lib/commands.rb, line 892
def have_role(instance_group_commands, role)
  instance_group_commands.select { |x| 
    x.instance_role.upcase == role 
  }.size > 0
end
new_instance_group_command(role, instance_count, instance_type) click to toggle source
# File lib/commands.rb, line 883
def new_instance_group_command(role, instance_count, instance_type)
  igc = CreateInstanceGroupCommand.new(
    "--instance-group ROLE", "Specify an instance group", role, commands
  )
  igc.instance_count = instance_count
  igc.instance_type = instance_type
  return igc
end
setup_instance_groups() click to toggle source
# File lib/commands.rb, line 898
def setup_instance_groups
  instance_groups = []
  if ! have_role(instance_group_commands, "MASTER") then
    mit = get_field(:master_instance_type, get_field(:instance_type, "m1.small"))
    master_instance_group = new_instance_group_command("MASTER", 1, mit)
    instance_group_commands << master_instance_group
  end
  if ! have_role(instance_group_commands, "CORE") then
    ni = get_field(:instance_count, 1).to_i
    if ni > 1 then
      sit = get_field(:slave_instance_type, get_field(:instance_type, "m1.small"))
      slave_instance_group = new_instance_group_command("CORE", ni-1, sit)
      slave_instance_group.instance_role = "CORE"
      instance_group_commands << slave_instance_group
    end
  else
    # Verify that user has not specified both --instance-group core and --num-instances
    if get_field(:instance_count) != nil then
      raise RuntimeError, "option --num-instances cannot be used when a core instance group is specified."
    end
  end
end
validate() click to toggle source
# File lib/commands.rb, line 817
def validate
  for step in step_commands do
    if step.is_a?(EnableDebuggingCommand) then
      require(:log_uri, "You must supply a logUri if you enable debugging when creating a job flow")
    end
  end

  for cmd in step_commands + instance_group_commands + bootstrap_commands do
    cmd.validate
  end

end