class RBHive::TCLIConnection

Attributes

client[R]

Public Class Methods

new(server, port = 10_000, options = {}, logger = StdOutLogger.new) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
 84 def initialize(server, port = 10_000, options = {}, logger = StdOutLogger.new)
 85   options ||= {} # backwards compatibility
 86   raise "'options' parameter must be a hash" unless options.is_a?(Hash)
 87   @sasl_params = options.delete(:sasl_params) || {}
 88 
 89   if options[:transport] == :sasl and @sasl_params.empty?
 90     raise ":transport is set to :sasl, but no :sasl_params option was supplied"
 91   end
 92 
 93   # Defaults to buffered transport, Hive 0.10, 1800 second timeout
 94   options[:transport]     ||= :buffered
 95   options[:hive_version]  ||= 10
 96   options[:timeout]       ||= 1800
 97   @options = options
 98   # Look up the appropriate Thrift protocol version for the supplied Hive version
 99   @thrift_protocol_version = thrift_hive_protocol(options[:hive_version])
100 
101   @logger = logger
102   @transport = thrift_transport(server, port)
103   @protocol = Thrift::BinaryProtocol.new(@transport)
104   @client = Hive2::Thrift::TCLIService::Client.new(@protocol)
105   @session = nil
106   @logger.info("Connecting to HiveServer2 #{server} on port #{port}")
107 end

Public Instance Methods

add_columns(schema) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
392 def add_columns(schema)
393   execute(schema.add_columns_statement)
394 end
async_cancel(handles) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
232 def async_cancel(handles)
233   @client.CancelOperation(prepare_cancel_request(handles))
234 end
async_close_session(handles) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
294 def async_close_session(handles)
295   validate_handles!(handles)
296   @client.CloseSession(Hive2::Thrift::TCloseSessionReq.new( sessionHandle: handles[:session] ))
297 end
async_execute(query) click to toggle source

Async execute

    # File lib/rbhive/t_c_l_i_connection.rb
193 def async_execute(query)
194   @logger.info("Executing query asynchronously: #{query}")
195   exec_result = @client.ExecuteStatement(
196     Hive2::Thrift::TExecuteStatementReq.new(
197       sessionHandle: @session.sessionHandle,
198       statement: query,
199       runAsync: true
200     )
201   )
202   raise_error_if_failed!(exec_result)
203   op_handle = exec_result.operationHandle
204 
205   # Return handles to get hold of this query / session again
206   {
207     session: @session.sessionHandle,
208     guid: op_handle.operationId.guid,
209     secret: op_handle.operationId.secret
210   }
211 end
async_fetch(handles, max_rows = 100) click to toggle source

Async fetch results from an async execute

    # File lib/rbhive/t_c_l_i_connection.rb
267 def async_fetch(handles, max_rows = 100)
268   # Can't get data from an unfinished query
269   unless async_is_complete?(handles)
270     raise "Can't perform fetch on a query in state: #{async_state(handles)}"
271   end
272 
273   # Fetch and
274   fetch_rows(prepare_operation_handle(handles), :first, max_rows)
275 end
async_fetch_in_batch(handles, batch_size = 1000) { |rows| ... } click to toggle source

Performs a query on the server, fetches the results in batches of batch_size rows and yields the result batches to a given block as arrays of rows.

    # File lib/rbhive/t_c_l_i_connection.rb
279 def async_fetch_in_batch(handles, batch_size = 1000, &block)
280   raise "No block given for the batch fetch request!" unless block_given?
281   # Can't get data from an unfinished query
282   unless async_is_complete?(handles)
283     raise "Can't perform fetch on a query in state: #{async_state(handles)}"
284   end
285 
286   # Now let's iterate over the results
287   loop do
288     rows = fetch_rows(prepare_operation_handle(handles), :next, batch_size)
289     break if rows.empty?
290     yield rows
291   end
292 end
async_is_cancelled?(handles) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
228 def async_is_cancelled?(handles)
229   async_state(handles) == :cancelled
230 end
async_is_complete?(handles) click to toggle source

Is the query complete?

    # File lib/rbhive/t_c_l_i_connection.rb
214 def async_is_complete?(handles)
215   async_state(handles) == :finished
216 end
async_is_failed?(handles) click to toggle source

Has the query failed?

    # File lib/rbhive/t_c_l_i_connection.rb
224 def async_is_failed?(handles)
225   async_state(handles) == :error
226 end
async_is_running?(handles) click to toggle source

Is the query actually running?

    # File lib/rbhive/t_c_l_i_connection.rb
219 def async_is_running?(handles)
220   async_state(handles) == :running
221 end
async_state(handles) click to toggle source

Map states to symbols

    # File lib/rbhive/t_c_l_i_connection.rb
237 def async_state(handles)
238   response = @client.GetOperationStatus(
239     Hive2::Thrift::TGetOperationStatusReq.new(operationHandle: prepare_operation_handle(handles))
240   )
241 
242   case response.operationState
243   when Hive2::Thrift::TOperationState::FINISHED_STATE
244     return :finished
245   when Hive2::Thrift::TOperationState::INITIALIZED_STATE
246     return :initialized
247   when Hive2::Thrift::TOperationState::RUNNING_STATE
248     return :running
249   when Hive2::Thrift::TOperationState::CANCELED_STATE
250     return :cancelled
251   when Hive2::Thrift::TOperationState::CLOSED_STATE
252     return :closed
253   when Hive2::Thrift::TOperationState::ERROR_STATE
254     return :error
255   when Hive2::Thrift::TOperationState::UKNOWN_STATE
256     return :unknown
257   when Hive2::Thrift::TOperationState::PENDING_STATE
258     return :pending
259   when nil
260     raise "No operation state found for handles - has the session been closed?"
261   else
262     return :state_not_in_protocol
263   end
264 end
close() click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
150 def close
151   @transport.close
152 end
close_session() click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
158 def close_session
159   @client.CloseSession prepare_close_session
160   @session = nil
161 end
create_table(schema) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
379 def create_table(schema)
380   execute(schema.create_table_statement)
381 end
drop_table(name) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
383 def drop_table(name)
384   name = name.name if name.is_a?(TableSchema)
385   execute("DROP TABLE `#{name}`")
386 end
execute(query) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
171 def execute(query)
172   @logger.info("Executing Hive Query: #{query}")
173   req = prepare_execute_statement(query)
174   exec_result = client.ExecuteStatement(req)
175   raise_error_if_failed!(exec_result)
176   exec_result
177 end
explain(query) click to toggle source

Performs a explain on the supplied query on the server, returns it as a ExplainResult. (Only works on 0.12 if you have this patch - issues.apache.org/jira/browse/HIVE-5492)

    # File lib/rbhive/t_c_l_i_connection.rb
335 def explain(query)
336   rows = []
337   fetch_in_batch("EXPLAIN " + query) do |batch|
338     rows << batch.map { |b| b[:Explain] }
339   end
340   ExplainResult.new(rows.flatten)
341 end
fetch(query, max_rows = 100) click to toggle source

Performs a query on the server, fetches up to max_rows rows and returns them as an array.

    # File lib/rbhive/t_c_l_i_connection.rb
344 def fetch(query, max_rows = 100)
345   # Execute the query and check the result
346   exec_result = execute(query)
347   raise_error_if_failed!(exec_result)
348 
349   # Get search operation handle to fetch the results
350   op_handle = exec_result.operationHandle
351 
352   # Fetch the rows
353   fetch_rows(op_handle, :first, max_rows)
354 end
fetch_in_batch(query, batch_size = 1000) { |rows| ... } click to toggle source

Performs a query on the server, fetches the results in batches of batch_size rows and yields the result batches to a given block as arrays of rows.

    # File lib/rbhive/t_c_l_i_connection.rb
358 def fetch_in_batch(query, batch_size = 1000, &block)
359   raise "No block given for the batch fetch request!" unless block_given?
360 
361   # Execute the query and check the result
362   exec_result = execute(query)
363   raise_error_if_failed!(exec_result)
364 
365   # Get search operation handle to fetch the results
366   op_handle = exec_result.operationHandle
367 
368   # Prepare fetch results request
369   fetch_req = prepare_fetch_results(op_handle, :next, batch_size)
370 
371   # Now let's iterate over the results
372   loop do
373     rows = fetch_rows(op_handle, :next, batch_size)
374     break if rows.empty?
375     yield rows
376   end
377 end
fetch_rows(op_handle, orientation = :first, max_rows = 1000) click to toggle source

Pull rows from the query result

    # File lib/rbhive/t_c_l_i_connection.rb
325 def fetch_rows(op_handle, orientation = :first, max_rows = 1000)
326   fetch_req = prepare_fetch_results(op_handle, orientation, max_rows)
327   fetch_results = @client.FetchResults(fetch_req)
328   raise_error_if_failed!(fetch_results)
329   fetch_results.results.rows
330   #TCLIResultSet.new(rows, TCLISchemaDefinition.new(get_schema_for(op_handle), rows.first))
331 end
get_column_info(op_handle) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
299 def get_column_info(op_handle)
300   cols = get_schema_for(op_handle).columns
301   [cols.map(&:columnName), cols.map{|c| c.typeDesc.types.first.primitiveEntry.type}]
302 end
method_missing(meth, *args) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
396 def method_missing(meth, *args)
397   client.send(meth, *args)
398 end
open() click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
146 def open
147   @transport.open
148 end
open_session() click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
154 def open_session
155   @session = @client.OpenSession(prepare_open_session(@thrift_protocol_version))
156 end
parse_sasl_params(sasl_params) click to toggle source

Processes SASL connection params and returns a hash with symbol keys or a nil

    # File lib/rbhive/t_c_l_i_connection.rb
135 def parse_sasl_params(sasl_params)
136   # Symbilize keys in a hash
137   if sasl_params.kind_of?(Hash)
138     return sasl_params.inject({}) do |memo,(k,v)|
139       memo[k.to_sym] = v;
140       memo
141     end
142   end
143   return nil
144 end
priority=(priority) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
179 def priority=(priority)
180   set("mapred.job.priority", priority)
181 end
queue=(queue) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
183 def queue=(queue)
184   set("mapred.job.queue.name", queue)
185 end
replace_columns(schema) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
388 def replace_columns(schema)
389   execute(schema.replace_columns_statement)
390 end
session() click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
163 def session
164   @session && @session.sessionHandle
165 end
set(name,value) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
187 def set(name,value)
188   @logger.info("Setting #{name}=#{value}")
189   self.execute("SET #{name}=#{value}")
190 end
thrift_hive_protocol(version) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
109 def thrift_hive_protocol(version)
110   HIVE_THRIFT_MAPPING[version] || raise("Invalid Hive version")
111 end
thrift_socket(server, port, timeout) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
128 def thrift_socket(server, port, timeout)
129   socket = Thrift::Socket.new(server, port)
130   socket.timeout = timeout
131   socket
132 end
thrift_transport(server, port) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
113 def thrift_transport(server, port)
114   @logger.info("Initializing transport #{@options[:transport]}")
115   case @options[:transport]
116   when :buffered
117     return Thrift::BufferedTransport.new(thrift_socket(server, port, @options[:timeout]))
118   when :sasl
119     return Thrift::SaslClientTransport.new(thrift_socket(server, port, @options[:timeout]),
120                                            parse_sasl_params(@sasl_params))
121   when :http
122     return Thrift::HTTPClientTransport.new("http://#{server}:#{port}/cliservice")
123   else
124     raise "Unrecognised transport type '#{transport}'"
125   end
126 end
yield_hash_rows(op_handle, columns, convertors) { |h| ... } click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
304 def yield_hash_rows(op_handle, columns, convertors)
305   i = -1
306   cols = columns.zip(convertors).map{|col, conv| [i+=1, col, conv]}
307   rows = fetch_rows(op_handle)
308   until rows.empty?
309     rows.each do |row|
310       h = {}
311       vals = row.colVals
312       cols.each do |i, col, conv|
313         v = vals[i].get_value.value
314         h[col] = unless v.nil?
315           conv ? conv[v] : v
316         end
317       end
318       yield h
319     end
320     rows = fetch_rows(op_handle, :next)
321   end
322 end

Private Instance Methods

get_schema_for(handle) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
451 def get_schema_for(handle)
452   req = ::Hive2::Thrift::TGetResultSetMetadataReq.new( operationHandle: handle )
453   metadata = client.GetResultSetMetadata( req )
454   metadata.schema
455 end
prepare_cancel_request(handles) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
439 def prepare_cancel_request(handles)
440   Hive2::Thrift::TCancelOperationReq.new(
441     operationHandle: prepare_operation_handle(handles)
442   )
443 end
prepare_close_session() click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
408 def prepare_close_session
409   ::Hive2::Thrift::TCloseSessionReq.new( sessionHandle: self.session )
410 end
prepare_execute_statement(query) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
412 def prepare_execute_statement(query)
413   ::Hive2::Thrift::TExecuteStatementReq.new( sessionHandle: self.session, statement: query.to_s, confOverlay: {"impala.resultset.cache.size"=>"100000"} )
414 end
prepare_fetch_results(handle, orientation=:first, rows=100) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
416 def prepare_fetch_results(handle, orientation=:first, rows=100)
417   orientation_value = "FETCH_#{orientation.to_s.upcase}"
418   valid_orientations = ::Hive2::Thrift::TFetchOrientation::VALUE_MAP.values
419   unless valid_orientations.include?(orientation_value)
420     raise ArgumentError, "Invalid orientation: #{orientation.inspect}"
421   end
422   orientation_const = eval("::Hive2::Thrift::TFetchOrientation::#{orientation_value}")
423   ::Hive2::Thrift::TFetchResultsReq.new(
424     operationHandle: handle,
425     orientation: orientation_const,
426     maxRows: rows
427   )
428 end
prepare_open_session(client_protocol) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
402 def prepare_open_session(client_protocol)
403   req = ::Hive2::Thrift::TOpenSessionReq.new( @sasl_params.empty? ? [] : @sasl_params )
404   req.client_protocol = client_protocol
405   req
406 end
prepare_operation_handle(handles) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
430 def prepare_operation_handle(handles)
431   validate_handles!(handles)
432   Hive2::Thrift::TOperationHandle.new(
433     operationId: Hive2::Thrift::THandleIdentifier.new(guid: handles[:guid], secret: handles[:secret]),
434     operationType: Hive2::Thrift::TOperationType::EXECUTE_STATEMENT,
435     hasResultSet: false
436   )
437 end
raise_error_if_failed!(result) click to toggle source

Raises an exception if given operation result is a failure

    # File lib/rbhive/t_c_l_i_connection.rb
458 def raise_error_if_failed!(result)
459   return if result.status.statusCode == 0
460   error_message = result.status.errorMessage || 'Execution failed!'
461   raise RBHive::TCLIConnectionError.new(error_message)
462 end
validate_handles!(handles) click to toggle source
    # File lib/rbhive/t_c_l_i_connection.rb
445 def validate_handles!(handles)
446   unless handles.has_key?(:guid) and handles.has_key?(:secret) and handles.has_key?(:session)
447     raise "Invalid handles hash: #{handles.inspect}"
448   end
449 end