class Impala::Cursor

Cursors are used to iterate over result sets without loading them all into memory at once. This can be useful if you're dealing with lots of rows. It implements Enumerable, so you can use each/select/map/etc.

Constants

BUFFER_SIZE
NULL
TYPECAST_MAP

Attributes

handle[R]
progress_reporter[R]
typecast_map[R]

Public Class Methods

new(handle, service, options = {}) click to toggle source
   # File lib/impala/cursor.rb
46 def initialize(handle, service, options = {})
47   @handle = handle
48   @service = service
49 
50   @row_buffer = []
51   @done = false
52   @open = true
53   @typecast_map = TYPECAST_MAP.dup
54   @options = options.dup
55   @progress_reporter = ProgressReporter.new(self, @options)
56   @poll_every = options.fetch(:poll_every, 0.1)
57 end
typecast_boolean(value) click to toggle source
   # File lib/impala/cursor.rb
 9 def self.typecast_boolean(value)
10   value == 'true'
11 end
typecast_decimal(value) click to toggle source
   # File lib/impala/cursor.rb
21 def self.typecast_decimal(value)
22   BigDecimal.new(value)
23 end
typecast_float(value) click to toggle source
   # File lib/impala/cursor.rb
17 def self.typecast_float(value)
18   value.to_f
19 end
typecast_int(value) click to toggle source
   # File lib/impala/cursor.rb
13 def self.typecast_int(value)
14   value.to_i
15 end
typecast_timestamp(value) click to toggle source
   # File lib/impala/cursor.rb
25 def self.typecast_timestamp(value)
26   Time.parse(value)
27 end

Public Instance Methods

close() click to toggle source

Close the cursor on the remote server. Once a cursor is closed, you can no longer fetch any rows from it.

    # File lib/impala/cursor.rb
 98 def close
 99   @open = false
100   @service.close(@handle)
101 end
columns() click to toggle source
   # File lib/impala/cursor.rb
59 def columns
60   @columns ||= metadata.schema.fieldSchemas.map(&:name)
61 end
each() { |row| ... } click to toggle source
   # File lib/impala/cursor.rb
67 def each
68   while row = fetch_row
69     yield row
70   end
71 end
exec_summary() click to toggle source
    # File lib/impala/cursor.rb
133 def exec_summary
134   @service.GetExecSummary(@handle)
135 end
fetch_all() click to toggle source

Returns all the remaining rows in the result set. @return [Array<Hash>] the remaining rows in the result set @see fetch_one

   # File lib/impala/cursor.rb
92 def fetch_all
93   self.to_a
94 end
fetch_row() click to toggle source

Returns the next available row as a hash, or nil if there are none left. @return [Hash, nil] the next available row, or nil if there are none

left

@see fetch_all

   # File lib/impala/cursor.rb
77 def fetch_row
78   if @row_buffer.empty?
79     if @done
80       return nil
81     else
82       fetch_more
83     end
84   end
85 
86   @row_buffer.shift
87 end
has_more?() click to toggle source

Returns true if there are any more rows to fetch.

    # File lib/impala/cursor.rb
125 def has_more?
126   !@done || !@row_buffer.empty?
127 end
inspect() click to toggle source
   # File lib/impala/cursor.rb
63 def inspect
64   "#<#{self.class}#{handle ? " QueryID: #{handle.id}" : ''}#{open? ? '' : ' (CLOSED)'}>"
65 end
open?() click to toggle source

Returns true if the cursor is still open.

    # File lib/impala/cursor.rb
104 def open?
105   @open
106 end
progress() click to toggle source

Returns the progress for the query.

    # File lib/impala/cursor.rb
138 def progress
139   summary = exec_summary
140   summary.progress.num_completed_scan_ranges.to_f / summary.progress.total_scan_ranges.to_f
141 end
query_done?() click to toggle source

Returns true if the query is done running, and results can be fetched.

    # File lib/impala/cursor.rb
109 def query_done?
110   [
111     Protocol::Beeswax::QueryState::EXCEPTION,
112     Protocol::Beeswax::QueryState::FINISHED
113   ].include?(@service.get_state(@handle))
114 end
runtime_profile() click to toggle source
    # File lib/impala/cursor.rb
129 def runtime_profile
130   @service.GetRuntimeProfile(@handle)
131 end
wait!() click to toggle source

Blocks until the query done running.

    # File lib/impala/cursor.rb
117 def wait!
118   until query_done?
119     periodic_callback
120     sleep @poll_every
121   end
122 end

Private Instance Methods

exceptional?() click to toggle source
    # File lib/impala/cursor.rb
160 def exceptional?
161   @service.get_state(@handle) == Protocol::Beeswax::QueryState::EXCEPTION
162 end
fetch_batch() click to toggle source
    # File lib/impala/cursor.rb
164 def fetch_batch
165   raise CursorError.new("Cursor has expired or been closed") unless @open
166   raise ConnectionError.new("The query was aborted") if exceptional?
167 
168   begin
169     res = @service.fetch(@handle, false, BUFFER_SIZE)
170   rescue Protocol::Beeswax::BeeswaxException
171     @open = false
172     raise CursorError.new("Cursor has expired or been closed")
173   end
174 
175   rows = res.data.map { |raw| parse_row(raw) }
176   @row_buffer.concat(rows)
177 
178   unless res.has_more
179     @done = true
180     close
181   end
182 end
fetch_more() click to toggle source
    # File lib/impala/cursor.rb
156 def fetch_more
157   fetch_batch until @done || @row_buffer.count >= BUFFER_SIZE
158 end
metadata() click to toggle source
    # File lib/impala/cursor.rb
152 def metadata
153   @metadata ||= @service.get_results_metadata(@handle)
154 end
parse_row(raw) click to toggle source
    # File lib/impala/cursor.rb
184 def parse_row(raw)
185   row = {}
186   fields = raw.split(metadata.delim)
187 
188   row_convertor.each do |c, p, i|
189     v = fields[i]
190     row[c] = (p ? p.call(v) : v unless v == NULL)
191   end
192 
193   row
194 end
periodic_callback() click to toggle source
    # File lib/impala/cursor.rb
147 def periodic_callback
148   return unless progress_reporter.show?
149   progress_reporter.report
150 end
row_convertor() click to toggle source
    # File lib/impala/cursor.rb
196 def row_convertor
197   @row_convertor ||= columns.zip(metadata.schema.fieldSchemas.map{|s| typecast_map[s.type]}, (0...(columns.length)).to_a)
198 end