class EmrClient

Attributes

commands[RW]
logger[RW]
options[RW]

Public Class Methods

new(commands, logger, client_class) click to toggle source
# File lib/client.rb, line 11
def initialize(commands, logger, client_class)
  @commands = commands
  @logger = logger
  @options = commands.global_options

  @config = {
    :endpoint            => @options[:endpoint] || "https://elasticmapreduce.amazonaws.com",
    :ca_file             => File.join(File.dirname(__FILE__), "cacert.pem"),
    :aws_access_key      => @options[:aws_access_id],
    :aws_secret_key      => @options[:aws_secret_key],
    :signature_algorithm => :V2,
    :content_type        => 'JSON',
    :verbose             => (@options[:verbose] != nil),
    :connect_timeout     => 60.0,
    :timeout             => 160.0
  }

  @client = Amazon::RetryDelegator.new(
    client_class.new_aws_query(@config),
    :retry_if => Proc.new { |*opts| self.is_retryable_error_response(*opts) }
  )
end

Public Instance Methods

add_instance_groups(options) click to toggle source
# File lib/client.rb, line 111
def add_instance_groups(options)
  logger.trace "AddInstanceGroups(#{options.inspect})"
  result = @client.AddInstanceGroups(options)
  logger.trace result.inspect
  return raise_on_error(result)
end
add_steps(jobflow_id, steps) click to toggle source
# File lib/client.rb, line 69
def add_steps(jobflow_id, steps)
  logger.trace "AddJobFlowSteps('JobFlowId' => #{jobflow_id.inspect}, 'Steps' => #{steps.inspect})"
  result = @client.AddJobFlowSteps('JobFlowId' => jobflow_id, 'Steps' => steps)
  logger.trace result.inspect
  return raise_on_error(result)
end
describe_jobflow(options) click to toggle source
# File lib/client.rb, line 83
def describe_jobflow(options)
  logger.trace "DescribeJobFlows(#{options.inspect})"
  result = @client.DescribeJobFlows(options.merge('DescriptionType' => 'EXTENDED'))
  logger.trace result.inspect
  return raise_on_error(result)
end
describe_jobflow_with_id(jobflow_id) click to toggle source
# File lib/client.rb, line 58
def describe_jobflow_with_id(jobflow_id)
  logger.trace "DescribeJobFlows('JobFlowIds' => [ #{jobflow_id} ])"
  result = @client.DescribeJobFlows('JobFlowIds' => [ jobflow_id ], 'DescriptionType' => 'EXTENDED')
  logger.trace result.inspect
  raise_on_error(result)
  if result == nil || result['JobFlows'].size() == 0 then
    raise RuntimeError, "Jobflow with id #{jobflow_id} not found"
  end
  return result['JobFlows'].first
end
is_error_response(response) click to toggle source
# File lib/client.rb, line 47
def is_error_response(response)
  response != nil && response.key?('Error')
end
is_retryable_error_response(response) click to toggle source
# File lib/client.rb, line 34
def is_retryable_error_response(response)
  if response == nil then
    false
  else
    ret = false
    if response['Error'] then 
      # note: 'Timeout' is not retryable because the operation might have completed just the connection timed out
      ret ||= ['InternalFailure', 'Throttling', 'ServiceUnavailable'].include?(response['Error']['Code'])
    end
    ret 
  end
end
modify_instance_groups(options) click to toggle source
# File lib/client.rb, line 104
def modify_instance_groups(options)
  logger.trace "ModifyInstanceGroups(#{options.inspect})"
  result = @client.ModifyInstanceGroups(options)
  logger.trace result.inspect
  return raise_on_error(result)
end
raise_on_error(response) click to toggle source
# File lib/client.rb, line 51
def raise_on_error(response)
  if is_error_response(response) then
    raise RuntimeError, response["Error"]["Message"]
  end
  return response
end
run_jobflow(jobflow) click to toggle source
# File lib/client.rb, line 76
def run_jobflow(jobflow)
  logger.trace "RunJobFlow(#{jobflow.inspect})"
  result = @client.RunJobFlow(jobflow)
  logger.trace result.inspect
  return raise_on_error(result)
end
set_termination_protection(jobflow_ids, protected) click to toggle source
# File lib/client.rb, line 90
def set_termination_protection(jobflow_ids, protected)
  logger.trace "SetTerminationProtection('JobFlowIds' => #{jobflow_ids.inspect}, 'TerminationProtected' => #{protected})"
  result = @client.SetTerminationProtection('JobFlowIds' => jobflow_ids, 'TerminationProtected' => protected)
  logger.trace result.inspect
  return raise_on_error(result)
end
terminate_jobflows(jobflow_ids) click to toggle source
# File lib/client.rb, line 97
def terminate_jobflows(jobflow_ids)
  logger.trace "TerminateJobFlows('JobFlowIds' => #{jobflow_ids.inspect})"
  result = @client.TerminateJobFlows('JobFlowIds' => jobflow_ids)
  logger.trace result.inspect
  return raise_on_error(result)
end