class RBHive::Connection
Attributes
client[R]
Public Class Methods
new(server, port=10_000, logger=StdOutLogger.new)
click to toggle source
# File lib/rbhive/connection.rb 34 def initialize(server, port=10_000, logger=StdOutLogger.new) 35 @socket = Thrift::Socket.new(server, port) 36 @transport = Thrift::BufferedTransport.new(@socket) 37 @protocol = Thrift::BinaryProtocol.new(@transport) 38 @client = Hive::Thrift::ThriftHive::Client.new(@protocol) 39 @logger = logger 40 @logger.info("Connecting to #{server} on port #{port}") 41 @mutex = Mutex.new 42 end
Public Instance Methods
add_columns(schema)
click to toggle source
# File lib/rbhive/connection.rb 125 def add_columns(schema) 126 execute(schema.add_columns_statement) 127 end
close()
click to toggle source
# File lib/rbhive/connection.rb 48 def close 49 @transport.close 50 end
create_table(schema)
click to toggle source
# File lib/rbhive/connection.rb 112 def create_table(schema) 113 execute(schema.create_table_statement) 114 end
drop_table(name)
click to toggle source
# File lib/rbhive/connection.rb 116 def drop_table(name) 117 name = name.name if name.is_a?(TableSchema) 118 execute("DROP TABLE `#{name}`") 119 end
execute(query)
click to toggle source
# File lib/rbhive/connection.rb 56 def execute(query) 57 execute_safe(query) 58 end
explain(query)
click to toggle source
# File lib/rbhive/connection.rb 60 def explain(query) 61 safe do 62 execute_unsafe("EXPLAIN "+ query) 63 ExplainResult.new(client.fetchAll) 64 end 65 end
fetch(query)
click to toggle source
# File lib/rbhive/connection.rb 80 def fetch(query) 81 safe do 82 execute_unsafe(query) 83 rows = client.fetchAll 84 the_schema = SchemaDefinition.new(client.getSchema, rows.first) 85 ResultSet.new(rows, the_schema) 86 end 87 end
fetch_in_batch(query, batch_size=1_000) { |result_set| ... }
click to toggle source
# File lib/rbhive/connection.rb 89 def fetch_in_batch(query, batch_size=1_000) 90 safe do 91 execute_unsafe(query) 92 until (next_batch = client.fetchN(batch_size)).empty? 93 the_schema ||= SchemaDefinition.new(client.getSchema, next_batch.first) 94 yield ResultSet.new(next_batch, the_schema) 95 end 96 end 97 end
first(query)
click to toggle source
# File lib/rbhive/connection.rb 99 def first(query) 100 safe do 101 execute_unsafe(query) 102 row = client.fetchOne 103 the_schema = SchemaDefinition.new(client.getSchema, row) 104 ResultSet.new([row], the_schema).first 105 end 106 end
method_missing(meth, *args)
click to toggle source
# File lib/rbhive/connection.rb 129 def method_missing(meth, *args) 130 client.send(meth, *args) 131 end
open()
click to toggle source
# File lib/rbhive/connection.rb 44 def open 45 @transport.open 46 end
priority=(priority)
click to toggle source
# File lib/rbhive/connection.rb 67 def priority=(priority) 68 set("mapred.job.priority", priority) 69 end
queue=(queue)
click to toggle source
# File lib/rbhive/connection.rb 71 def queue=(queue) 72 set("mapred.job.queue.name", queue) 73 end
replace_columns(schema)
click to toggle source
# File lib/rbhive/connection.rb 121 def replace_columns(schema) 122 execute(schema.replace_columns_statement) 123 end
schema(example_row=[])
click to toggle source
# File lib/rbhive/connection.rb 108 def schema(example_row=[]) 109 safe { SchemaDefinition.new(client.getSchema, example_row) } 110 end
set(name,value)
click to toggle source
# File lib/rbhive/connection.rb 75 def set(name,value) 76 @logger.info("Setting #{name}=#{value}") 77 client.execute("SET #{name}=#{value}") 78 end
Private Instance Methods
execute_safe(query)
click to toggle source
# File lib/rbhive/connection.rb 135 def execute_safe(query) 136 safe { execute_unsafe(query) } 137 end
execute_unsafe(query)
click to toggle source
# File lib/rbhive/connection.rb 139 def execute_unsafe(query) 140 @logger.info("Executing Hive Query: #{query}") 141 client.execute(query) 142 end
safe() { || ... }
click to toggle source
# File lib/rbhive/connection.rb 144 def safe 145 ret = nil 146 @mutex.synchronize { ret = yield } 147 ret 148 end