class Impala::Connection

This object represents a connection to an Impala server. It can be used to perform queries on the database.

Attributes

host[RW]
port[RW]

Public Class Methods

new(host, port, options={}) click to toggle source

Don't instantiate Connections directly; instead, use {Impala.connect}.

   # File lib/impala/connection.rb
 8 def initialize(host, port, options={})
 9   @host = host
10   @port = port
11   @connected = false
12   @options = options.dup
13   @options[:transport] ||= :buffered
14   @loggers = @options.fetch(:loggers, [])
15   open
16 end

Public Instance Methods

close() click to toggle source

Close this connection. It can still be reopened with {#open}.

   # File lib/impala/connection.rb
70 def close
71   return unless @connected
72 
73   @transport.close
74   @connected = false
75 end
close_handle(handle) click to toggle source
    # File lib/impala/connection.rb
117 def close_handle(handle)
118   @service.close(handle)
119 end
execute(raw_query, query_options = {}) click to toggle source

Perform a query and return a cursor for iterating over the results. @param [String] query the query you want to run @param [Hash] query_options the options to set user and configuration

except for :user, see TImpalaQueryOptions in ImpalaService.thrift

@option query_options [String] :user the user runs the query @return [Cursor] a cursor for the result rows

    # File lib/impala/connection.rb
106 def execute(raw_query, query_options = {})
107   raise ConnectionError.new("Connection closed") unless open?
108 
109   query = sanitize_query(raw_query)
110   handle = send_query(query, query_options)
111 
112   cursor = Cursor.new(handle, @service, @options)
113   cursor.wait!
114   cursor
115 end
inspect() click to toggle source
   # File lib/impala/connection.rb
18 def inspect
19   "#<#{self.class} #{@host}:#{@port}#{open? ? '' : ' (DISCONNECTED)'}>"
20 end
open() click to toggle source

Open the connection if it's currently closed.

   # File lib/impala/connection.rb
23 def open
24   return if @connected
25 
26   @transport = thrift_transport(host, port)
27   @transport.open do |transport|
28     enable_keepalive(transport)
29   end
30 
31   proto = Thrift::BinaryProtocol.new(@transport)
32   @service = Protocol::ImpalaService::Client.new(proto)
33   @connected = true
34 end
open?() click to toggle source

Returns true if the connection is currently open.

   # File lib/impala/connection.rb
78 def open?
79   @connected
80 end
parse_sasl_params(sasl_params) click to toggle source

Processes SASL connection params and returns a hash with symbol keys or a nil

   # File lib/impala/connection.rb
58 def parse_sasl_params(sasl_params)
59   # Symbilize keys in a hash
60   if sasl_params.kind_of?(Hash)
61     return sasl_params.inject({}) do |memo,(k,v)|
62       memo[k.to_sym] = v;
63       memo
64     end
65   end
66   return nil
67 end
query(raw_query, query_options = {}) click to toggle source

Perform a query and return all the results. This will load the entire result set into memory, so if you're dealing with lots of rows, {#execute} may work better. @param [String] query the query you want to run @param [Hash] query_options the options to set user and configuration

except for :user, see TImpalaQueryOptions in ImpalaService.thrift

@option query_options [String] :user the user runs the query @return [Array<Hash>] an array of hashes, one for each row.

   # File lib/impala/connection.rb
96 def query(raw_query, query_options = {})
97   execute(raw_query, query_options).fetch_all
98 end
refresh() click to toggle source

Refresh the metadata store.

   # File lib/impala/connection.rb
83 def refresh
84   raise ConnectionError.new("Connection closed") unless open?
85   @service.ResetCatalog
86 end
thrift_socket(server, port, timeout) click to toggle source
   # File lib/impala/connection.rb
51 def thrift_socket(server, port, timeout)
52   socket = Thrift::Socket.new(server, port)
53   socket.timeout = timeout
54   socket
55 end
thrift_transport(server, port) click to toggle source
   # File lib/impala/connection.rb
36 def thrift_transport(server, port)
37   socket = thrift_socket(server, port, @options[:timeout])
38 
39   case @options[:transport]
40   when :buffered
41     return Thrift::BufferedTransport.new(socket)
42   when :sasl
43     opts = parse_sasl_params(@options[:sasl_params])
44     mechanism = opts.delete(:mechanism)
45     return SASLTransport.new(socket, mechanism, opts)
46   else
47     raise "Unrecognised transport type '#{@options[:transport]}'"
48   end
49 end

Private Instance Methods

enable_keepalive(transport) click to toggle source
    # File lib/impala/connection.rb
143 def enable_keepalive(transport)
144   s = transport.handle
145   log_debug("Enabling KEEPALIVE...")
146   s.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_KEEPALIVE, true)
147 
148   # Apparently Mac OS X (Darwin) doesn't implement the SOL_TCP options below
149   # so we'll hope keep alive works under Mac OS X, but in production
150   # we Dockerize Jigsaw, so these options should be available when
151   # we're running on Linux
152   if defined?(::Socket::SOL_TCP)
153     opts = {}
154 
155     if defined?(::Socket::TCP_KEEPIDLE)
156       opts[::Socket::TCP_KEEPIDLE] = 60
157     end
158 
159     if defined?(::Socket::TCP_KEEPINTVL)
160       opts[::Socket::TCP_KEEPINTVL] = 10
161     end
162 
163     if defined?(::Socket::TCP_KEEPCNT)
164       opts[::Socket::TCP_KEEPCNT] = 5
165     end
166 
167     log_debug("Also enabling: #{opts.inspect}")
168     opts.each do |opt, value|
169       s.setsockopt(::Socket::SOL_TCP, opt, value)
170     end
171   end
172 end
log_debug(message) click to toggle source
    # File lib/impala/connection.rb
174 def log_debug(message)
175   @loggers.each do |logger|
176     logger.debug(message)
177   end
178 end
sanitize_query(raw_query) click to toggle source
    # File lib/impala/connection.rb
123 def sanitize_query(raw_query)
124   words = raw_query.split
125   raise InvalidQueryError.new("Empty query") if words.empty?
126 
127   command = words.first.downcase
128   ([command] + words[1..-1]).join(' ')
129 end
send_query(sanitized_query, query_options) click to toggle source
    # File lib/impala/connection.rb
131 def send_query(sanitized_query, query_options)
132   query = Protocol::Beeswax::Query.new
133   query.query = sanitized_query
134 
135   query.hadoop_user = query_options.delete(:user) if query_options[:user]
136   query.configuration = query_options.map do |key, value|
137     "#{key.upcase}=#{value}"
138   end
139 
140   @service.query(query)
141 end