class RBHive::Connection

Attributes

client[R]

Public Class Methods

new(server, port=10_000, logger=StdOutLogger.new) click to toggle source
   # File lib/rbhive/connection.rb
34 def initialize(server, port=10_000, logger=StdOutLogger.new)
35   @socket = Thrift::Socket.new(server, port)
36   @transport = Thrift::BufferedTransport.new(@socket)
37   @protocol = Thrift::BinaryProtocol.new(@transport)
38   @client = Hive::Thrift::ThriftHive::Client.new(@protocol)
39   @logger = logger
40   @logger.info("Connecting to #{server} on port #{port}")
41   @mutex = Mutex.new
42 end

Public Instance Methods

add_columns(schema) click to toggle source
    # File lib/rbhive/connection.rb
125 def add_columns(schema)
126   execute(schema.add_columns_statement)
127 end
close() click to toggle source
   # File lib/rbhive/connection.rb
48 def close
49   @transport.close
50 end
create_table(schema) click to toggle source
    # File lib/rbhive/connection.rb
112 def create_table(schema)
113   execute(schema.create_table_statement)
114 end
drop_table(name) click to toggle source
    # File lib/rbhive/connection.rb
116 def drop_table(name)
117   name = name.name if name.is_a?(TableSchema)
118   execute("DROP TABLE `#{name}`")
119 end
execute(query) click to toggle source
   # File lib/rbhive/connection.rb
56 def execute(query)
57   execute_safe(query)
58 end
explain(query) click to toggle source
   # File lib/rbhive/connection.rb
60 def explain(query)
61   safe do
62     execute_unsafe("EXPLAIN "+ query)
63     ExplainResult.new(client.fetchAll)
64   end
65 end
fetch(query) click to toggle source
   # File lib/rbhive/connection.rb
80 def fetch(query)
81   safe do
82     execute_unsafe(query)
83     rows = client.fetchAll
84     the_schema = SchemaDefinition.new(client.getSchema, rows.first)
85     ResultSet.new(rows, the_schema)
86   end
87 end
fetch_in_batch(query, batch_size=1_000) { |result_set| ... } click to toggle source
   # File lib/rbhive/connection.rb
89 def fetch_in_batch(query, batch_size=1_000)
90   safe do
91     execute_unsafe(query)
92     until (next_batch = client.fetchN(batch_size)).empty?
93       the_schema ||= SchemaDefinition.new(client.getSchema, next_batch.first)
94       yield ResultSet.new(next_batch, the_schema)
95     end
96   end
97 end
first(query) click to toggle source
    # File lib/rbhive/connection.rb
 99 def first(query)
100   safe do
101     execute_unsafe(query)
102     row = client.fetchOne
103     the_schema = SchemaDefinition.new(client.getSchema, row)
104     ResultSet.new([row], the_schema).first
105   end
106 end
method_missing(meth, *args) click to toggle source
    # File lib/rbhive/connection.rb
129 def method_missing(meth, *args)
130   client.send(meth, *args)
131 end
open() click to toggle source
   # File lib/rbhive/connection.rb
44 def open
45   @transport.open
46 end
priority=(priority) click to toggle source
   # File lib/rbhive/connection.rb
67 def priority=(priority)
68   set("mapred.job.priority", priority)
69 end
queue=(queue) click to toggle source
   # File lib/rbhive/connection.rb
71 def queue=(queue)
72   set("mapred.job.queue.name", queue)
73 end
replace_columns(schema) click to toggle source
    # File lib/rbhive/connection.rb
121 def replace_columns(schema)
122   execute(schema.replace_columns_statement)
123 end
schema(example_row=[]) click to toggle source
    # File lib/rbhive/connection.rb
108 def schema(example_row=[])
109   safe { SchemaDefinition.new(client.getSchema, example_row) }
110 end
set(name,value) click to toggle source
   # File lib/rbhive/connection.rb
75 def set(name,value)
76   @logger.info("Setting #{name}=#{value}")
77   client.execute("SET #{name}=#{value}")
78 end

Private Instance Methods

execute_safe(query) click to toggle source
    # File lib/rbhive/connection.rb
135 def execute_safe(query)
136   safe { execute_unsafe(query) }
137 end
execute_unsafe(query) click to toggle source
    # File lib/rbhive/connection.rb
139 def execute_unsafe(query)
140   @logger.info("Executing Hive Query: #{query}")
141   client.execute(query)
142 end
safe() { || ... } click to toggle source
    # File lib/rbhive/connection.rb
144 def safe
145   ret = nil
146   @mutex.synchronize { ret = yield }
147   ret
148 end