class EmrClient
Attributes
commands[RW]
logger[RW]
options[RW]
Public Class Methods
new(commands, logger, client_class)
click to toggle source
# File lib/client.rb, line 11 def initialize(commands, logger, client_class) @commands = commands @logger = logger @options = commands.global_options @config = { :endpoint => @options[:endpoint] || "https://elasticmapreduce.amazonaws.com", :ca_file => File.join(File.dirname(__FILE__), "cacert.pem"), :aws_access_key => @options[:aws_access_id], :aws_secret_key => @options[:aws_secret_key], :signature_algorithm => :V2, :content_type => 'JSON', :verbose => (@options[:verbose] != nil), :connect_timeout => 60.0, :timeout => 160.0 } @client = Amazon::RetryDelegator.new( client_class.new_aws_query(@config), :retry_if => Proc.new { |*opts| self.is_retryable_error_response(*opts) } ) end
Public Instance Methods
add_instance_groups(options)
click to toggle source
# File lib/client.rb, line 111 def add_instance_groups(options) logger.trace "AddInstanceGroups(#{options.inspect})" result = @client.AddInstanceGroups(options) logger.trace result.inspect return raise_on_error(result) end
add_steps(jobflow_id, steps)
click to toggle source
# File lib/client.rb, line 69 def add_steps(jobflow_id, steps) logger.trace "AddJobFlowSteps('JobFlowId' => #{jobflow_id.inspect}, 'Steps' => #{steps.inspect})" result = @client.AddJobFlowSteps('JobFlowId' => jobflow_id, 'Steps' => steps) logger.trace result.inspect return raise_on_error(result) end
describe_jobflow(options)
click to toggle source
# File lib/client.rb, line 83 def describe_jobflow(options) logger.trace "DescribeJobFlows(#{options.inspect})" result = @client.DescribeJobFlows(options.merge('DescriptionType' => 'EXTENDED')) logger.trace result.inspect return raise_on_error(result) end
describe_jobflow_with_id(jobflow_id)
click to toggle source
# File lib/client.rb, line 58 def describe_jobflow_with_id(jobflow_id) logger.trace "DescribeJobFlows('JobFlowIds' => [ #{jobflow_id} ])" result = @client.DescribeJobFlows('JobFlowIds' => [ jobflow_id ], 'DescriptionType' => 'EXTENDED') logger.trace result.inspect raise_on_error(result) if result == nil || result['JobFlows'].size() == 0 then raise RuntimeError, "Jobflow with id #{jobflow_id} not found" end return result['JobFlows'].first end
is_error_response(response)
click to toggle source
# File lib/client.rb, line 47 def is_error_response(response) response != nil && response.key?('Error') end
is_retryable_error_response(response)
click to toggle source
# File lib/client.rb, line 34 def is_retryable_error_response(response) if response == nil then false else ret = false if response['Error'] then # note: 'Timeout' is not retryable because the operation might have completed just the connection timed out ret ||= ['InternalFailure', 'Throttling', 'ServiceUnavailable'].include?(response['Error']['Code']) end ret end end
modify_instance_groups(options)
click to toggle source
# File lib/client.rb, line 104 def modify_instance_groups(options) logger.trace "ModifyInstanceGroups(#{options.inspect})" result = @client.ModifyInstanceGroups(options) logger.trace result.inspect return raise_on_error(result) end
raise_on_error(response)
click to toggle source
# File lib/client.rb, line 51 def raise_on_error(response) if is_error_response(response) then raise RuntimeError, response["Error"]["Message"] end return response end
run_jobflow(jobflow)
click to toggle source
# File lib/client.rb, line 76 def run_jobflow(jobflow) logger.trace "RunJobFlow(#{jobflow.inspect})" result = @client.RunJobFlow(jobflow) logger.trace result.inspect return raise_on_error(result) end
set_termination_protection(jobflow_ids, protected)
click to toggle source
# File lib/client.rb, line 90 def set_termination_protection(jobflow_ids, protected) logger.trace "SetTerminationProtection('JobFlowIds' => #{jobflow_ids.inspect}, 'TerminationProtected' => #{protected})" result = @client.SetTerminationProtection('JobFlowIds' => jobflow_ids, 'TerminationProtected' => protected) logger.trace result.inspect return raise_on_error(result) end
terminate_jobflows(jobflow_ids)
click to toggle source
# File lib/client.rb, line 97 def terminate_jobflows(jobflow_ids) logger.trace "TerminateJobFlows('JobFlowIds' => #{jobflow_ids.inspect})" result = @client.TerminateJobFlows('JobFlowIds' => jobflow_ids) logger.trace result.inspect return raise_on_error(result) end