class Impala::Connection
This object represents a connection to an Impala
server. It can be used to perform queries on the database.
Attributes
Public Class Methods
Don't instantiate Connections directly; instead, use {Impala.connect}.
# File lib/impala/connection.rb 8 def initialize(host, port, options={}) 9 @host = host 10 @port = port 11 @connected = false 12 @options = options.dup 13 @options[:transport] ||= :buffered 14 @loggers = @options.fetch(:loggers, []) 15 open 16 end
Public Instance Methods
Close this connection. It can still be reopened with {#open}.
# File lib/impala/connection.rb 70 def close 71 return unless @connected 72 73 @transport.close 74 @connected = false 75 end
# File lib/impala/connection.rb 117 def close_handle(handle) 118 @service.close(handle) 119 end
Perform a query and return a cursor for iterating over the results. @param [String] query the query you want to run @param [Hash] query_options the options to set user and configuration
except for :user, see TImpalaQueryOptions in ImpalaService.thrift
@option query_options [String] :user the user runs the query @return [Cursor] a cursor for the result rows
# File lib/impala/connection.rb 106 def execute(raw_query, query_options = {}) 107 raise ConnectionError.new("Connection closed") unless open? 108 109 query = sanitize_query(raw_query) 110 handle = send_query(query, query_options) 111 112 cursor = Cursor.new(handle, @service, @options) 113 cursor.wait! 114 cursor 115 end
# File lib/impala/connection.rb 18 def inspect 19 "#<#{self.class} #{@host}:#{@port}#{open? ? '' : ' (DISCONNECTED)'}>" 20 end
Open the connection if it's currently closed.
# File lib/impala/connection.rb 23 def open 24 return if @connected 25 26 @transport = thrift_transport(host, port) 27 @transport.open do |transport| 28 enable_keepalive(transport) 29 end 30 31 proto = Thrift::BinaryProtocol.new(@transport) 32 @service = Protocol::ImpalaService::Client.new(proto) 33 @connected = true 34 end
Returns true if the connection is currently open.
# File lib/impala/connection.rb 78 def open? 79 @connected 80 end
Processes SASL connection params and returns a hash with symbol keys or a nil
# File lib/impala/connection.rb 58 def parse_sasl_params(sasl_params) 59 # Symbilize keys in a hash 60 if sasl_params.kind_of?(Hash) 61 return sasl_params.inject({}) do |memo,(k,v)| 62 memo[k.to_sym] = v; 63 memo 64 end 65 end 66 return nil 67 end
Perform a query and return all the results. This will load the entire result set into memory, so if you're dealing with lots of rows, {#execute} may work better. @param [String] query the query you want to run @param [Hash] query_options the options to set user and configuration
except for :user, see TImpalaQueryOptions in ImpalaService.thrift
@option query_options [String] :user the user runs the query @return [Array<Hash>] an array of hashes, one for each row.
# File lib/impala/connection.rb 96 def query(raw_query, query_options = {}) 97 execute(raw_query, query_options).fetch_all 98 end
Refresh the metadata store.
# File lib/impala/connection.rb 83 def refresh 84 raise ConnectionError.new("Connection closed") unless open? 85 @service.ResetCatalog 86 end
# File lib/impala/connection.rb 51 def thrift_socket(server, port, timeout) 52 socket = Thrift::Socket.new(server, port) 53 socket.timeout = timeout 54 socket 55 end
# File lib/impala/connection.rb 36 def thrift_transport(server, port) 37 socket = thrift_socket(server, port, @options[:timeout]) 38 39 case @options[:transport] 40 when :buffered 41 return Thrift::BufferedTransport.new(socket) 42 when :sasl 43 opts = parse_sasl_params(@options[:sasl_params]) 44 mechanism = opts.delete(:mechanism) 45 return SASLTransport.new(socket, mechanism, opts) 46 else 47 raise "Unrecognised transport type '#{@options[:transport]}'" 48 end 49 end
Private Instance Methods
# File lib/impala/connection.rb 143 def enable_keepalive(transport) 144 s = transport.handle 145 log_debug("Enabling KEEPALIVE...") 146 s.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_KEEPALIVE, true) 147 148 # Apparently Mac OS X (Darwin) doesn't implement the SOL_TCP options below 149 # so we'll hope keep alive works under Mac OS X, but in production 150 # we Dockerize Jigsaw, so these options should be available when 151 # we're running on Linux 152 if defined?(::Socket::SOL_TCP) 153 opts = {} 154 155 if defined?(::Socket::TCP_KEEPIDLE) 156 opts[::Socket::TCP_KEEPIDLE] = 60 157 end 158 159 if defined?(::Socket::TCP_KEEPINTVL) 160 opts[::Socket::TCP_KEEPINTVL] = 10 161 end 162 163 if defined?(::Socket::TCP_KEEPCNT) 164 opts[::Socket::TCP_KEEPCNT] = 5 165 end 166 167 log_debug("Also enabling: #{opts.inspect}") 168 opts.each do |opt, value| 169 s.setsockopt(::Socket::SOL_TCP, opt, value) 170 end 171 end 172 end
# File lib/impala/connection.rb 174 def log_debug(message) 175 @loggers.each do |logger| 176 logger.debug(message) 177 end 178 end
# File lib/impala/connection.rb 123 def sanitize_query(raw_query) 124 words = raw_query.split 125 raise InvalidQueryError.new("Empty query") if words.empty? 126 127 command = words.first.downcase 128 ([command] + words[1..-1]).join(' ') 129 end
# File lib/impala/connection.rb 131 def send_query(sanitized_query, query_options) 132 query = Protocol::Beeswax::Query.new 133 query.query = sanitized_query 134 135 query.hadoop_user = query_options.delete(:user) if query_options[:user] 136 query.configuration = query_options.map do |key, value| 137 "#{key.upcase}=#{value}" 138 end 139 140 @service.query(query) 141 end