class RBHive::TCLIConnection
Attributes
Public Class Methods
# File lib/rbhive/t_c_l_i_connection.rb 84 def initialize(server, port = 10_000, options = {}, logger = StdOutLogger.new) 85 options ||= {} # backwards compatibility 86 raise "'options' parameter must be a hash" unless options.is_a?(Hash) 87 @sasl_params = options.delete(:sasl_params) || {} 88 89 if options[:transport] == :sasl and @sasl_params.empty? 90 raise ":transport is set to :sasl, but no :sasl_params option was supplied" 91 end 92 93 # Defaults to buffered transport, Hive 0.10, 1800 second timeout 94 options[:transport] ||= :buffered 95 options[:hive_version] ||= 10 96 options[:timeout] ||= 1800 97 @options = options 98 # Look up the appropriate Thrift protocol version for the supplied Hive version 99 @thrift_protocol_version = thrift_hive_protocol(options[:hive_version]) 100 101 @logger = logger 102 @transport = thrift_transport(server, port) 103 @protocol = Thrift::BinaryProtocol.new(@transport) 104 @client = Hive2::Thrift::TCLIService::Client.new(@protocol) 105 @session = nil 106 @logger.info("Connecting to HiveServer2 #{server} on port #{port}") 107 end
Public Instance Methods
# File lib/rbhive/t_c_l_i_connection.rb 392 def add_columns(schema) 393 execute(schema.add_columns_statement) 394 end
# File lib/rbhive/t_c_l_i_connection.rb 232 def async_cancel(handles) 233 @client.CancelOperation(prepare_cancel_request(handles)) 234 end
# File lib/rbhive/t_c_l_i_connection.rb 294 def async_close_session(handles) 295 validate_handles!(handles) 296 @client.CloseSession(Hive2::Thrift::TCloseSessionReq.new( sessionHandle: handles[:session] )) 297 end
Async execute
# File lib/rbhive/t_c_l_i_connection.rb 193 def async_execute(query) 194 @logger.info("Executing query asynchronously: #{query}") 195 exec_result = @client.ExecuteStatement( 196 Hive2::Thrift::TExecuteStatementReq.new( 197 sessionHandle: @session.sessionHandle, 198 statement: query, 199 runAsync: true 200 ) 201 ) 202 raise_error_if_failed!(exec_result) 203 op_handle = exec_result.operationHandle 204 205 # Return handles to get hold of this query / session again 206 { 207 session: @session.sessionHandle, 208 guid: op_handle.operationId.guid, 209 secret: op_handle.operationId.secret 210 } 211 end
Async fetch results from an async execute
# File lib/rbhive/t_c_l_i_connection.rb 267 def async_fetch(handles, max_rows = 100) 268 # Can't get data from an unfinished query 269 unless async_is_complete?(handles) 270 raise "Can't perform fetch on a query in state: #{async_state(handles)}" 271 end 272 273 # Fetch and 274 fetch_rows(prepare_operation_handle(handles), :first, max_rows) 275 end
Performs a query on the server, fetches the results in batches of batch_size rows and yields the result batches to a given block as arrays of rows.
# File lib/rbhive/t_c_l_i_connection.rb 279 def async_fetch_in_batch(handles, batch_size = 1000, &block) 280 raise "No block given for the batch fetch request!" unless block_given? 281 # Can't get data from an unfinished query 282 unless async_is_complete?(handles) 283 raise "Can't perform fetch on a query in state: #{async_state(handles)}" 284 end 285 286 # Now let's iterate over the results 287 loop do 288 rows = fetch_rows(prepare_operation_handle(handles), :next, batch_size) 289 break if rows.empty? 290 yield rows 291 end 292 end
# File lib/rbhive/t_c_l_i_connection.rb 228 def async_is_cancelled?(handles) 229 async_state(handles) == :cancelled 230 end
Is the query complete?
# File lib/rbhive/t_c_l_i_connection.rb 214 def async_is_complete?(handles) 215 async_state(handles) == :finished 216 end
Has the query failed?
# File lib/rbhive/t_c_l_i_connection.rb 224 def async_is_failed?(handles) 225 async_state(handles) == :error 226 end
Is the query actually running?
# File lib/rbhive/t_c_l_i_connection.rb 219 def async_is_running?(handles) 220 async_state(handles) == :running 221 end
Map states to symbols
# File lib/rbhive/t_c_l_i_connection.rb 237 def async_state(handles) 238 response = @client.GetOperationStatus( 239 Hive2::Thrift::TGetOperationStatusReq.new(operationHandle: prepare_operation_handle(handles)) 240 ) 241 242 case response.operationState 243 when Hive2::Thrift::TOperationState::FINISHED_STATE 244 return :finished 245 when Hive2::Thrift::TOperationState::INITIALIZED_STATE 246 return :initialized 247 when Hive2::Thrift::TOperationState::RUNNING_STATE 248 return :running 249 when Hive2::Thrift::TOperationState::CANCELED_STATE 250 return :cancelled 251 when Hive2::Thrift::TOperationState::CLOSED_STATE 252 return :closed 253 when Hive2::Thrift::TOperationState::ERROR_STATE 254 return :error 255 when Hive2::Thrift::TOperationState::UKNOWN_STATE 256 return :unknown 257 when Hive2::Thrift::TOperationState::PENDING_STATE 258 return :pending 259 when nil 260 raise "No operation state found for handles - has the session been closed?" 261 else 262 return :state_not_in_protocol 263 end 264 end
# File lib/rbhive/t_c_l_i_connection.rb 150 def close 151 @transport.close 152 end
# File lib/rbhive/t_c_l_i_connection.rb 158 def close_session 159 @client.CloseSession prepare_close_session 160 @session = nil 161 end
# File lib/rbhive/t_c_l_i_connection.rb 379 def create_table(schema) 380 execute(schema.create_table_statement) 381 end
# File lib/rbhive/t_c_l_i_connection.rb 383 def drop_table(name) 384 name = name.name if name.is_a?(TableSchema) 385 execute("DROP TABLE `#{name}`") 386 end
# File lib/rbhive/t_c_l_i_connection.rb 171 def execute(query) 172 @logger.info("Executing Hive Query: #{query}") 173 req = prepare_execute_statement(query) 174 exec_result = client.ExecuteStatement(req) 175 raise_error_if_failed!(exec_result) 176 exec_result 177 end
Performs a explain on the supplied query on the server, returns it as a ExplainResult
. (Only works on 0.12 if you have this patch - issues.apache.org/jira/browse/HIVE-5492)
# File lib/rbhive/t_c_l_i_connection.rb 335 def explain(query) 336 rows = [] 337 fetch_in_batch("EXPLAIN " + query) do |batch| 338 rows << batch.map { |b| b[:Explain] } 339 end 340 ExplainResult.new(rows.flatten) 341 end
Performs a query on the server, fetches up to max_rows rows and returns them as an array.
# File lib/rbhive/t_c_l_i_connection.rb 344 def fetch(query, max_rows = 100) 345 # Execute the query and check the result 346 exec_result = execute(query) 347 raise_error_if_failed!(exec_result) 348 349 # Get search operation handle to fetch the results 350 op_handle = exec_result.operationHandle 351 352 # Fetch the rows 353 fetch_rows(op_handle, :first, max_rows) 354 end
Performs a query on the server, fetches the results in batches of batch_size rows and yields the result batches to a given block as arrays of rows.
# File lib/rbhive/t_c_l_i_connection.rb 358 def fetch_in_batch(query, batch_size = 1000, &block) 359 raise "No block given for the batch fetch request!" unless block_given? 360 361 # Execute the query and check the result 362 exec_result = execute(query) 363 raise_error_if_failed!(exec_result) 364 365 # Get search operation handle to fetch the results 366 op_handle = exec_result.operationHandle 367 368 # Prepare fetch results request 369 fetch_req = prepare_fetch_results(op_handle, :next, batch_size) 370 371 # Now let's iterate over the results 372 loop do 373 rows = fetch_rows(op_handle, :next, batch_size) 374 break if rows.empty? 375 yield rows 376 end 377 end
Pull rows from the query result
# File lib/rbhive/t_c_l_i_connection.rb 325 def fetch_rows(op_handle, orientation = :first, max_rows = 1000) 326 fetch_req = prepare_fetch_results(op_handle, orientation, max_rows) 327 fetch_results = @client.FetchResults(fetch_req) 328 raise_error_if_failed!(fetch_results) 329 fetch_results.results.rows 330 #TCLIResultSet.new(rows, TCLISchemaDefinition.new(get_schema_for(op_handle), rows.first)) 331 end
# File lib/rbhive/t_c_l_i_connection.rb 299 def get_column_info(op_handle) 300 cols = get_schema_for(op_handle).columns 301 [cols.map(&:columnName), cols.map{|c| c.typeDesc.types.first.primitiveEntry.type}] 302 end
# File lib/rbhive/t_c_l_i_connection.rb 396 def method_missing(meth, *args) 397 client.send(meth, *args) 398 end
# File lib/rbhive/t_c_l_i_connection.rb 146 def open 147 @transport.open 148 end
# File lib/rbhive/t_c_l_i_connection.rb 154 def open_session 155 @session = @client.OpenSession(prepare_open_session(@thrift_protocol_version)) 156 end
Processes SASL connection params and returns a hash with symbol keys or a nil
# File lib/rbhive/t_c_l_i_connection.rb 135 def parse_sasl_params(sasl_params) 136 # Symbilize keys in a hash 137 if sasl_params.kind_of?(Hash) 138 return sasl_params.inject({}) do |memo,(k,v)| 139 memo[k.to_sym] = v; 140 memo 141 end 142 end 143 return nil 144 end
# File lib/rbhive/t_c_l_i_connection.rb 179 def priority=(priority) 180 set("mapred.job.priority", priority) 181 end
# File lib/rbhive/t_c_l_i_connection.rb 183 def queue=(queue) 184 set("mapred.job.queue.name", queue) 185 end
# File lib/rbhive/t_c_l_i_connection.rb 388 def replace_columns(schema) 389 execute(schema.replace_columns_statement) 390 end
# File lib/rbhive/t_c_l_i_connection.rb 163 def session 164 @session && @session.sessionHandle 165 end
# File lib/rbhive/t_c_l_i_connection.rb 187 def set(name,value) 188 @logger.info("Setting #{name}=#{value}") 189 self.execute("SET #{name}=#{value}") 190 end
# File lib/rbhive/t_c_l_i_connection.rb 109 def thrift_hive_protocol(version) 110 HIVE_THRIFT_MAPPING[version] || raise("Invalid Hive version") 111 end
# File lib/rbhive/t_c_l_i_connection.rb 128 def thrift_socket(server, port, timeout) 129 socket = Thrift::Socket.new(server, port) 130 socket.timeout = timeout 131 socket 132 end
# File lib/rbhive/t_c_l_i_connection.rb 113 def thrift_transport(server, port) 114 @logger.info("Initializing transport #{@options[:transport]}") 115 case @options[:transport] 116 when :buffered 117 return Thrift::BufferedTransport.new(thrift_socket(server, port, @options[:timeout])) 118 when :sasl 119 return Thrift::SaslClientTransport.new(thrift_socket(server, port, @options[:timeout]), 120 parse_sasl_params(@sasl_params)) 121 when :http 122 return Thrift::HTTPClientTransport.new("http://#{server}:#{port}/cliservice") 123 else 124 raise "Unrecognised transport type '#{transport}'" 125 end 126 end
# File lib/rbhive/t_c_l_i_connection.rb 304 def yield_hash_rows(op_handle, columns, convertors) 305 i = -1 306 cols = columns.zip(convertors).map{|col, conv| [i+=1, col, conv]} 307 rows = fetch_rows(op_handle) 308 until rows.empty? 309 rows.each do |row| 310 h = {} 311 vals = row.colVals 312 cols.each do |i, col, conv| 313 v = vals[i].get_value.value 314 h[col] = unless v.nil? 315 conv ? conv[v] : v 316 end 317 end 318 yield h 319 end 320 rows = fetch_rows(op_handle, :next) 321 end 322 end
Private Instance Methods
# File lib/rbhive/t_c_l_i_connection.rb 451 def get_schema_for(handle) 452 req = ::Hive2::Thrift::TGetResultSetMetadataReq.new( operationHandle: handle ) 453 metadata = client.GetResultSetMetadata( req ) 454 metadata.schema 455 end
# File lib/rbhive/t_c_l_i_connection.rb 439 def prepare_cancel_request(handles) 440 Hive2::Thrift::TCancelOperationReq.new( 441 operationHandle: prepare_operation_handle(handles) 442 ) 443 end
# File lib/rbhive/t_c_l_i_connection.rb 408 def prepare_close_session 409 ::Hive2::Thrift::TCloseSessionReq.new( sessionHandle: self.session ) 410 end
# File lib/rbhive/t_c_l_i_connection.rb 412 def prepare_execute_statement(query) 413 ::Hive2::Thrift::TExecuteStatementReq.new( sessionHandle: self.session, statement: query.to_s, confOverlay: {"impala.resultset.cache.size"=>"100000"} ) 414 end
# File lib/rbhive/t_c_l_i_connection.rb 416 def prepare_fetch_results(handle, orientation=:first, rows=100) 417 orientation_value = "FETCH_#{orientation.to_s.upcase}" 418 valid_orientations = ::Hive2::Thrift::TFetchOrientation::VALUE_MAP.values 419 unless valid_orientations.include?(orientation_value) 420 raise ArgumentError, "Invalid orientation: #{orientation.inspect}" 421 end 422 orientation_const = eval("::Hive2::Thrift::TFetchOrientation::#{orientation_value}") 423 ::Hive2::Thrift::TFetchResultsReq.new( 424 operationHandle: handle, 425 orientation: orientation_const, 426 maxRows: rows 427 ) 428 end
# File lib/rbhive/t_c_l_i_connection.rb 402 def prepare_open_session(client_protocol) 403 req = ::Hive2::Thrift::TOpenSessionReq.new( @sasl_params.empty? ? [] : @sasl_params ) 404 req.client_protocol = client_protocol 405 req 406 end
# File lib/rbhive/t_c_l_i_connection.rb 430 def prepare_operation_handle(handles) 431 validate_handles!(handles) 432 Hive2::Thrift::TOperationHandle.new( 433 operationId: Hive2::Thrift::THandleIdentifier.new(guid: handles[:guid], secret: handles[:secret]), 434 operationType: Hive2::Thrift::TOperationType::EXECUTE_STATEMENT, 435 hasResultSet: false 436 ) 437 end
Raises an exception if given operation result is a failure
# File lib/rbhive/t_c_l_i_connection.rb 458 def raise_error_if_failed!(result) 459 return if result.status.statusCode == 0 460 error_message = result.status.errorMessage || 'Execution failed!' 461 raise RBHive::TCLIConnectionError.new(error_message) 462 end
# File lib/rbhive/t_c_l_i_connection.rb 445 def validate_handles!(handles) 446 unless handles.has_key?(:guid) and handles.has_key?(:secret) and handles.has_key?(:session) 447 raise "Invalid handles hash: #{handles.inspect}" 448 end 449 end