class Presto::Client::StatementClient
Constants
- JSON_OPTIONS
Presto
can return too deep nested JSON
Attributes
query[R]
Public Class Methods
new(faraday, query, options, next_uri=nil)
click to toggle source
# File lib/presto/client/statement_client.rb, line 29 def initialize(faraday, query, options, next_uri=nil) @faraday = faraday @options = options @query = query @state = :running @retry_timeout = options[:retry_timeout] || 120 if model_version = @options[:model_version] @models = ModelVersions.const_get("V#{model_version.gsub(".", "_")}") else @models = Models end @plan_timeout = options[:plan_timeout] @query_timeout = options[:query_timeout] if @plan_timeout || @query_timeout # this is set before the first call of faraday_get_with_retry so that # resuming StatementClient with next_uri is also under timeout control. @started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) end if next_uri response = faraday_get_with_retry(next_uri) @results_headers = response.headers @results = @models::QueryResults.decode(parse_body(response)) else post_query_request! end end
Public Instance Methods
advance()
click to toggle source
# File lib/presto/client/statement_client.rb, line 138 def advance return false unless running? unless has_next? @state = :finished return false end uri = @results.next_uri response = faraday_get_with_retry(uri) @results_headers = response.headers @results = decode_model(uri, parse_body(response), @models::QueryResults) raise_if_timeout! return true end
cancel_leaf_stage()
click to toggle source
# File lib/presto/client/statement_client.rb, line 253 def cancel_leaf_stage if uri = @results.partial_cancel_uri @faraday.delete do |req| req.url uri end end end
client_aborted?()
click to toggle source
# File lib/presto/client/statement_client.rb, line 97 def client_aborted? @state == :client_aborted end
client_error?()
click to toggle source
# File lib/presto/client/statement_client.rb, line 101 def client_error? @state == :client_error end
close()
click to toggle source
# File lib/presto/client/statement_client.rb, line 261 def close return unless running? @state = :client_aborted begin if uri = @results.next_uri @faraday.delete do |req| req.url uri end end rescue => e end nil end
current_results()
click to toggle source
# File lib/presto/client/statement_client.rb, line 117 def current_results @results end
current_results_headers()
click to toggle source
# File lib/presto/client/statement_client.rb, line 121 def current_results_headers @results_headers end
debug?()
click to toggle source
# File lib/presto/client/statement_client.rb, line 89 def debug? !!@options[:debug] end
exception!(e)
click to toggle source
# File lib/presto/client/statement_client.rb, line 133 def exception!(e) @state = :client_error raise e end
faraday_get_with_retry(uri, &block)
click to toggle source
# File lib/presto/client/statement_client.rb, line 192 def faraday_get_with_retry(uri, &block) start = Process.clock_gettime(Process::CLOCK_MONOTONIC) attempts = 0 begin begin response = @faraday.get(uri) rescue Faraday::Error::TimeoutError, Faraday::Error::ConnectionFailed # temporally error to retry response = nil rescue => e exception! e end if response if response.status == 200 && !response.body.to_s.empty? return response end if response.status != 503 # retry only if 503 Service Unavailable # deterministic error exception! PrestoHttpError.new(response.status, "Presto API error at #{uri} returned #{response.status}: #{response.body}") end end raise_if_timeout! attempts += 1 sleep attempts * 0.1 end while (Process.clock_gettime(Process::CLOCK_MONOTONIC) - start) < @retry_timeout && !client_aborted? exception! PrestoHttpError.new(408, "Presto API error due to timeout") end
finished?()
click to toggle source
# File lib/presto/client/statement_client.rb, line 105 def finished? @state == :finished end
has_next?()
click to toggle source
# File lib/presto/client/statement_client.rb, line 129 def has_next? !!@results.next_uri end
query_failed?()
click to toggle source
# File lib/presto/client/statement_client.rb, line 109 def query_failed? @results.error != nil end
query_id()
click to toggle source
# File lib/presto/client/statement_client.rb, line 125 def query_id @results.id end
query_info()
click to toggle source
# File lib/presto/client/statement_client.rb, line 157 def query_info uri = "/v1/query/#{@results.id}" response = faraday_get_with_retry(uri) decode_model(uri, parse_body(response), @models::QueryInfo) end
query_succeeded?()
click to toggle source
# File lib/presto/client/statement_client.rb, line 113 def query_succeeded? @results.error == nil && finished? end
raise_if_timeout!()
click to toggle source
# File lib/presto/client/statement_client.rb, line 226 def raise_if_timeout! if @started_at return if finished? elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - @started_at if @query_timeout && elapsed > @query_timeout raise_timeout_error! end if @plan_timeout && (@results == nil || @results.columns == nil) && elapsed > @plan_timeout # @results is not set (even first faraday_get_with_retry isn't called yet) or # result from Presto doesn't include result schema. Query planning isn't done yet. raise_timeout_error! end end end
raise_timeout_error!()
click to toggle source
# File lib/presto/client/statement_client.rb, line 245 def raise_timeout_error! if query_id = @results && @results.id exception! PrestoQueryTimeoutError.new("Query #{query_id} timed out") else exception! PrestoQueryTimeoutError.new("Query timed out") end end
running?()
click to toggle source
# File lib/presto/client/statement_client.rb, line 93 def running? @state == :running end
Private Instance Methods
decode_model(uri, hash, body_class)
click to toggle source
# File lib/presto/client/statement_client.rb, line 163 def decode_model(uri, hash, body_class) begin body_class.decode(hash) rescue => e body = JSON.dump(hash) if body.size > 1024 + 3 body = "#{body[0, 1024]}..." end exception! PrestoHttpError.new(500, "Presto API returned unexpected structure at #{uri}. Expected #{body_class} but got #{body}: #{e}") end end
init_request(req)
click to toggle source
# File lib/presto/client/statement_client.rb, line 60 def init_request(req) req.options.timeout = @options[:http_timeout] || 300 req.options.open_timeout = @options[:http_open_timeout] || 60 end
parse_body(response)
click to toggle source
# File lib/presto/client/statement_client.rb, line 177 def parse_body(response) begin case response.headers['Content-Type'] when 'application/x-msgpack' MessagePack.load(response.body) else JSON.parse(response.body, opts = JSON_OPTIONS) end rescue => e exception! PrestoHttpError.new(500, "Presto API returned unexpected data format. #{e}") end end
post_query_request!()
click to toggle source
# File lib/presto/client/statement_client.rb, line 67 def post_query_request! uri = "/v1/statement" response = @faraday.post do |req| req.url uri req.body = @query init_request(req) end # TODO error handling if response.status != 200 exception! PrestoHttpError.new(response.status, "Failed to start query: #{response.body} (#{response.status})") end @results_headers = response.headers @results = decode_model(uri, parse_body(response), @models::QueryResults) end