class Impala::Connection
This object represents a connection to an Impala
server. It can be used to perform queries on the database.
Constants
- LOG_CONTEXT_ID
Public Class Methods
Don't instantiate Connections directly; instead, use {Impala.connect}.
# File lib/impala/connection.rb, line 8 def initialize(host, port, options={}) @host = host @port = port @options = options @connected = false open end
Public Instance Methods
Close this connection. It can still be reopened with {#open}.
# File lib/impala/connection.rb, line 42 def close return unless @connected @transport.close @connected = false end
Perform a query and return a cursor for iterating over the results. @param [String] query the query you want to run @param [Hash] query_options the options to set user and configuration
except for :user, see TImpalaQueryOptions in ImpalaService.thrift
@option query_options [String] :user the user runs the query @return [Cursor] a cursor for the result rows
# File lib/impala/connection.rb, line 78 def execute(query, query_options = {}) raise ConnectionError.new("Connection closed") unless open? handle = send_query(query, query_options) check_result(handle) Cursor.new(handle, @service) end
# File lib/impala/connection.rb, line 16 def inspect "#<#{self.class} #{@host}:#{@port}#{open? ? '' : ' (DISCONNECTED)'}>" end
Open the connection if it's currently closed.
# File lib/impala/connection.rb, line 21 def open return if @connected socket = Thrift::Socket.new(@host, @port, @options[:timeout]) if @options[:kerberos] @transport = SASLTransport.new(socket, :GSSAPI, @options[:kerberos]) elsif @options[:sasl] @transport = SASLTransport.new(socket, :PLAIN, @options[:sasl]) else @transport = Thrift::BufferedTransport.new(socket) end @transport.open proto = Thrift::BinaryProtocol.new(@transport) @service = Protocol::ImpalaService::Client.new(proto) @connected = true end
Returns true if the connection is currently open.
# File lib/impala/connection.rb, line 50 def open? @connected end
Perform a query and return all the results. This will load the entire result set into memory, so if you're dealing with lots of rows, {#execute} may work better. @param [String] query the query you want to run @param [Hash] query_options the options to set user and configuration
except for :user, see TImpalaQueryOptions in ImpalaService.thrift
@option query_options [String] :user the user runs the query @return [Array<Hash>] an array of hashes, one for each row.
# File lib/impala/connection.rb, line 68 def query(query, query_options = {}) execute(query, query_options).fetch_all end
Refresh the metadata store.
# File lib/impala/connection.rb, line 55 def refresh raise ConnectionError.new("Connection closed") unless open? @service.ResetCatalog end
Private Instance Methods
# File lib/impala/connection.rb, line 100 def check_result(handle) state = @service.get_state(handle) if state == Protocol::Beeswax::QueryState::EXCEPTION close_handle(handle) raise ConnectionError.new("The query was aborted") end rescue close_handle(handle) raise end
# File lib/impala/connection.rb, line 111 def close_handle(handle) @service.close(handle) end
# File lib/impala/connection.rb, line 88 def send_query(query_text, query_options) query = Protocol::Beeswax::Query.new query.query = query_text query.hadoop_user = query_options.delete(:user) if query_options[:user] query.configuration = query_options.map do |key, value| "#{key.upcase}=#{value}" end @service.executeAndWait(query, LOG_CONTEXT_ID) end