class Impala::Cursor

Cursors are used to iterate over result sets without loading them all into memory at once. This can be useful if you're dealing with lots of rows. It implements Enumerable, so you can use each/select/map/etc.

Constants

BUFFER_SIZE
NULL
TYPECAST_MAP

Attributes

handle[R]
progress_reporter[R]
typecast_map[R]

Public Class Methods

new(handle, service, options = {}) click to toggle source
   # File lib/impala/cursor.rb
46 def initialize(handle, service, options = {})
47   @handle = handle
48   @service = service
49 
50 
51   @row_buffer = []
52   @done = false
53   @open = true
54   @typecast_map = TYPECAST_MAP.dup
55   @options = options.dup
56   @progress_reporter = ProgressReporter.new(self, @options)
57   @poll_every = options.fetch(:poll_every, 0.1)
58 end
typecast_boolean(value) click to toggle source
   # File lib/impala/cursor.rb
 9 def self.typecast_boolean(value)
10   value == 'true'
11 end
typecast_decimal(value) click to toggle source
   # File lib/impala/cursor.rb
21 def self.typecast_decimal(value)
22   BigDecimal.new(value)
23 end
typecast_float(value) click to toggle source
   # File lib/impala/cursor.rb
17 def self.typecast_float(value)
18   value.to_f
19 end
typecast_int(value) click to toggle source
   # File lib/impala/cursor.rb
13 def self.typecast_int(value)
14   value.to_i
15 end
typecast_timestamp(value) click to toggle source
   # File lib/impala/cursor.rb
25 def self.typecast_timestamp(value)
26   Time.parse(value)
27 end

Public Instance Methods

close() click to toggle source

Close the cursor on the remote server. Once a cursor is closed, you can no longer fetch any rows from it.

    # File lib/impala/cursor.rb
 99 def close
100   @open = false
101   @service.close(@handle)
102 end
columns() click to toggle source
   # File lib/impala/cursor.rb
60 def columns
61   @columns ||= metadata.schema.fieldSchemas.map(&:name)
62 end
each() { |row| ... } click to toggle source
   # File lib/impala/cursor.rb
68 def each
69   while row = fetch_row
70     yield row
71   end
72 end
exec_summary() click to toggle source
    # File lib/impala/cursor.rb
134 def exec_summary
135   @service.GetExecSummary(@handle)
136 end
fetch_all() click to toggle source

Returns all the remaining rows in the result set. @return [Array<Hash>] the remaining rows in the result set @see fetch_one

   # File lib/impala/cursor.rb
93 def fetch_all
94   self.to_a
95 end
fetch_row() click to toggle source

Returns the next available row as a hash, or nil if there are none left. @return [Hash, nil] the next available row, or nil if there are none

left

@see fetch_all

   # File lib/impala/cursor.rb
78 def fetch_row
79   if @row_buffer.empty?
80     if @done
81       return nil
82     else
83       fetch_more
84     end
85   end
86 
87   @row_buffer.shift
88 end
has_more?() click to toggle source

Returns true if there are any more rows to fetch.

    # File lib/impala/cursor.rb
126 def has_more?
127   !@done || !@row_buffer.empty?
128 end
inspect() click to toggle source
   # File lib/impala/cursor.rb
64 def inspect
65   "#<#{self.class}#{open? ? '' : ' (CLOSED)'}>"
66 end
open?() click to toggle source

Returns true if the cursor is still open.

    # File lib/impala/cursor.rb
105 def open?
106   @open
107 end
progress() click to toggle source

Returns the progress for the query.

    # File lib/impala/cursor.rb
139 def progress
140   summary = exec_summary
141   summary.progress.num_completed_scan_ranges.to_f / summary.progress.total_scan_ranges.to_f
142 end
query_done?() click to toggle source

Returns true if the query is done running, and results can be fetched.

    # File lib/impala/cursor.rb
110 def query_done?
111   [
112     Protocol::Beeswax::QueryState::EXCEPTION,
113     Protocol::Beeswax::QueryState::FINISHED
114   ].include?(@service.get_state(@handle))
115 end
runtime_profile() click to toggle source
    # File lib/impala/cursor.rb
130 def runtime_profile
131   @service.GetRuntimeProfile(@handle)
132 end
wait!() click to toggle source

Blocks until the query done running.

    # File lib/impala/cursor.rb
118 def wait!
119   until query_done?
120     periodic_callback
121     sleep @poll_every
122   end
123 end

Private Instance Methods

exceptional?() click to toggle source
    # File lib/impala/cursor.rb
161 def exceptional?
162   @service.get_state(@handle) == Protocol::Beeswax::QueryState::EXCEPTION
163 end
fetch_batch() click to toggle source
    # File lib/impala/cursor.rb
165 def fetch_batch
166   raise CursorError.new("Cursor has expired or been closed") unless @open
167   raise ConnectionError.new("The query was aborted") if exceptional?
168 
169   begin
170     res = @service.fetch(@handle, false, BUFFER_SIZE)
171   rescue Protocol::Beeswax::BeeswaxException
172     @open = false
173     raise CursorError.new("Cursor has expired or been closed")
174   end
175 
176   rows = res.data.map { |raw| parse_row(raw) }
177   @row_buffer.concat(rows)
178 
179   unless res.has_more
180     @done = true
181     close
182   end
183 end
fetch_more() click to toggle source
    # File lib/impala/cursor.rb
157 def fetch_more
158   fetch_batch until @done || @row_buffer.count >= BUFFER_SIZE
159 end
metadata() click to toggle source
    # File lib/impala/cursor.rb
153 def metadata
154   @metadata ||= @service.get_results_metadata(@handle)
155 end
parse_row(raw) click to toggle source
    # File lib/impala/cursor.rb
185 def parse_row(raw)
186   row = {}
187   fields = raw.split(metadata.delim)
188 
189   row_convertor.each do |c, p, i|
190     v = fields[i]
191     row[c] = (p ? p.call(v) : v unless v == NULL)
192   end
193 
194   row
195 end
periodic_callback() click to toggle source
    # File lib/impala/cursor.rb
148 def periodic_callback
149   return unless progress_reporter.show?
150   progress_reporter.report
151 end
row_convertor() click to toggle source
    # File lib/impala/cursor.rb
197 def row_convertor
198   @row_convertor ||= columns.zip(metadata.schema.fieldSchemas.map{|s| typecast_map[s.type]}, (0...(columns.length)).to_a)
199 end