class Impala::Connection

This object represents a connection to an Impala server. It can be used to perform queries on the database.

Attributes

host[RW]
port[RW]

Public Class Methods

new(host, port, options={}) click to toggle source

Don't instantiate Connections directly; instead, use {Impala.connect}.

   # File lib/impala/connection.rb
 8 def initialize(host, port, options={})
 9   @host = host
10   @port = port
11   @connected = false
12   @options = options.dup
13   @options[:transport] ||= :buffered
14   open
15 end

Public Instance Methods

close() click to toggle source

Close this connection. It can still be reopened with {#open}.

   # File lib/impala/connection.rb
67 def close
68   return unless @connected
69 
70   @transport.close
71   @connected = false
72 end
close_handle(handle) click to toggle source
    # File lib/impala/connection.rb
114 def close_handle(handle)
115   @service.close(handle)
116 end
execute(raw_query, query_options = {}) click to toggle source

Perform a query and return a cursor for iterating over the results. @param [String] query the query you want to run @param [Hash] query_options the options to set user and configuration

except for :user, see TImpalaQueryOptions in ImpalaService.thrift

@option query_options [String] :user the user runs the query @return [Cursor] a cursor for the result rows

    # File lib/impala/connection.rb
103 def execute(raw_query, query_options = {})
104   raise ConnectionError.new("Connection closed") unless open?
105 
106   query = sanitize_query(raw_query)
107   handle = send_query(query, query_options)
108 
109   cursor = Cursor.new(handle, @service, @options)
110   cursor.wait!
111   cursor
112 end
inspect() click to toggle source
   # File lib/impala/connection.rb
17 def inspect
18   "#<#{self.class} #{@host}:#{@port}#{open? ? '' : ' (DISCONNECTED)'}>"
19 end
open() click to toggle source

Open the connection if it's currently closed.

   # File lib/impala/connection.rb
22 def open
23   return if @connected
24 
25   @transport = thrift_transport(host, port)
26   @transport.open
27 
28   proto = Thrift::BinaryProtocol.new(@transport)
29   @service = Protocol::ImpalaService::Client.new(proto)
30   @connected = true
31 end
open?() click to toggle source

Returns true if the connection is currently open.

   # File lib/impala/connection.rb
75 def open?
76   @connected
77 end
parse_sasl_params(sasl_params) click to toggle source

Processes SASL connection params and returns a hash with symbol keys or a nil

   # File lib/impala/connection.rb
55 def parse_sasl_params(sasl_params)
56   # Symbilize keys in a hash
57   if sasl_params.kind_of?(Hash)
58     return sasl_params.inject({}) do |memo,(k,v)|
59       memo[k.to_sym] = v;
60       memo
61     end
62   end
63   return nil
64 end
query(raw_query, query_options = {}) click to toggle source

Perform a query and return all the results. This will load the entire result set into memory, so if you're dealing with lots of rows, {#execute} may work better. @param [String] query the query you want to run @param [Hash] query_options the options to set user and configuration

except for :user, see TImpalaQueryOptions in ImpalaService.thrift

@option query_options [String] :user the user runs the query @return [Array<Hash>] an array of hashes, one for each row.

   # File lib/impala/connection.rb
93 def query(raw_query, query_options = {})
94   execute(raw_query, query_options).fetch_all
95 end
refresh() click to toggle source

Refresh the metadata store.

   # File lib/impala/connection.rb
80 def refresh
81   raise ConnectionError.new("Connection closed") unless open?
82   @service.ResetCatalog
83 end
thrift_socket(server, port, timeout) click to toggle source
   # File lib/impala/connection.rb
48 def thrift_socket(server, port, timeout)
49   socket = Thrift::Socket.new(server, port)
50   socket.timeout = timeout
51   socket
52 end
thrift_transport(server, port) click to toggle source
   # File lib/impala/connection.rb
33 def thrift_transport(server, port)
34   socket = thrift_socket(server, port, @options[:timeout])
35 
36   case @options[:transport]
37   when :buffered
38     return Thrift::BufferedTransport.new(socket)
39   when :sasl
40     opts = parse_sasl_params(@options[:sasl_params])
41     mechanism = opts.delete(:mechanism)
42     return SASLTransport.new(socket, mechanism, opts)
43   else
44     raise "Unrecognised transport type '#{@options[:transport]}'"
45   end
46 end

Private Instance Methods

sanitize_query(raw_query) click to toggle source
    # File lib/impala/connection.rb
120 def sanitize_query(raw_query)
121   words = raw_query.split
122   raise InvalidQueryError.new("Empty query") if words.empty?
123 
124   command = words.first.downcase
125   ([command] + words[1..-1]).join(' ')
126 end
send_query(sanitized_query, query_options) click to toggle source
    # File lib/impala/connection.rb
128 def send_query(sanitized_query, query_options)
129   query = Protocol::Beeswax::Query.new
130   query.query = sanitized_query
131 
132   query.hadoop_user = query_options.delete(:user) if query_options[:user]
133   query.configuration = query_options.map do |key, value|
134     "#{key.upcase}=#{value}"
135   end
136 
137   @service.query(query)
138 end