class Impala::Connection
This object represents a connection to an Impala
server. It can be used to perform queries on the database.
Attributes
Public Class Methods
Don't instantiate Connections directly; instead, use {Impala.connect}.
# File lib/impala/connection.rb 8 def initialize(host, port, options={}) 9 @host = host 10 @port = port 11 @connected = false 12 @options = options.dup 13 @options[:transport] ||= :buffered 14 open 15 end
Public Instance Methods
Close this connection. It can still be reopened with {#open}.
# File lib/impala/connection.rb 67 def close 68 return unless @connected 69 70 @transport.close 71 @connected = false 72 end
# File lib/impala/connection.rb 114 def close_handle(handle) 115 @service.close(handle) 116 end
Perform a query and return a cursor for iterating over the results. @param [String] query the query you want to run @param [Hash] query_options the options to set user and configuration
except for :user, see TImpalaQueryOptions in ImpalaService.thrift
@option query_options [String] :user the user runs the query @return [Cursor] a cursor for the result rows
# File lib/impala/connection.rb 103 def execute(raw_query, query_options = {}) 104 raise ConnectionError.new("Connection closed") unless open? 105 106 query = sanitize_query(raw_query) 107 handle = send_query(query, query_options) 108 109 cursor = Cursor.new(handle, @service, @options) 110 cursor.wait! 111 cursor 112 end
# File lib/impala/connection.rb 17 def inspect 18 "#<#{self.class} #{@host}:#{@port}#{open? ? '' : ' (DISCONNECTED)'}>" 19 end
Open the connection if it's currently closed.
# File lib/impala/connection.rb 22 def open 23 return if @connected 24 25 @transport = thrift_transport(host, port) 26 @transport.open 27 28 proto = Thrift::BinaryProtocol.new(@transport) 29 @service = Protocol::ImpalaService::Client.new(proto) 30 @connected = true 31 end
Returns true if the connection is currently open.
# File lib/impala/connection.rb 75 def open? 76 @connected 77 end
Processes SASL connection params and returns a hash with symbol keys or a nil
# File lib/impala/connection.rb 55 def parse_sasl_params(sasl_params) 56 # Symbilize keys in a hash 57 if sasl_params.kind_of?(Hash) 58 return sasl_params.inject({}) do |memo,(k,v)| 59 memo[k.to_sym] = v; 60 memo 61 end 62 end 63 return nil 64 end
Perform a query and return all the results. This will load the entire result set into memory, so if you're dealing with lots of rows, {#execute} may work better. @param [String] query the query you want to run @param [Hash] query_options the options to set user and configuration
except for :user, see TImpalaQueryOptions in ImpalaService.thrift
@option query_options [String] :user the user runs the query @return [Array<Hash>] an array of hashes, one for each row.
# File lib/impala/connection.rb 93 def query(raw_query, query_options = {}) 94 execute(raw_query, query_options).fetch_all 95 end
Refresh the metadata store.
# File lib/impala/connection.rb 80 def refresh 81 raise ConnectionError.new("Connection closed") unless open? 82 @service.ResetCatalog 83 end
# File lib/impala/connection.rb 48 def thrift_socket(server, port, timeout) 49 socket = Thrift::Socket.new(server, port) 50 socket.timeout = timeout 51 socket 52 end
# File lib/impala/connection.rb 33 def thrift_transport(server, port) 34 socket = thrift_socket(server, port, @options[:timeout]) 35 36 case @options[:transport] 37 when :buffered 38 return Thrift::BufferedTransport.new(socket) 39 when :sasl 40 opts = parse_sasl_params(@options[:sasl_params]) 41 mechanism = opts.delete(:mechanism) 42 return SASLTransport.new(socket, mechanism, opts) 43 else 44 raise "Unrecognised transport type '#{@options[:transport]}'" 45 end 46 end
Private Instance Methods
# File lib/impala/connection.rb 120 def sanitize_query(raw_query) 121 words = raw_query.split 122 raise InvalidQueryError.new("Empty query") if words.empty? 123 124 command = words.first.downcase 125 ([command] + words[1..-1]).join(' ') 126 end
# File lib/impala/connection.rb 128 def send_query(sanitized_query, query_options) 129 query = Protocol::Beeswax::Query.new 130 query.query = sanitized_query 131 132 query.hadoop_user = query_options.delete(:user) if query_options[:user] 133 query.configuration = query_options.map do |key, value| 134 "#{key.upcase}=#{value}" 135 end 136 137 @service.query(query) 138 end