class Impala::Cursor
Cursors are used to iterate over result sets without loading them all into memory at once. This can be useful if you're dealing with lots of rows. It implements Enumerable, so you can use each/select/map/etc.
Constants
- BUFFER_SIZE
- NULL
- TYPECAST_MAP
Attributes
Public Class Methods
# File lib/impala/cursor.rb 46 def initialize(handle, service, options = {}) 47 @handle = handle 48 @service = service 49 50 @row_buffer = [] 51 @done = false 52 @open = true 53 @typecast_map = TYPECAST_MAP.dup 54 @options = options.dup 55 @progress_reporter = ProgressReporter.new(self, @options) 56 @poll_every = options.fetch(:poll_every, 0.1) 57 end
# File lib/impala/cursor.rb 9 def self.typecast_boolean(value) 10 value == 'true' 11 end
# File lib/impala/cursor.rb 21 def self.typecast_decimal(value) 22 BigDecimal.new(value) 23 end
# File lib/impala/cursor.rb 17 def self.typecast_float(value) 18 value.to_f 19 end
# File lib/impala/cursor.rb 13 def self.typecast_int(value) 14 value.to_i 15 end
# File lib/impala/cursor.rb 25 def self.typecast_timestamp(value) 26 Time.parse(value) 27 end
Public Instance Methods
Close the cursor on the remote server. Once a cursor is closed, you can no longer fetch any rows from it.
# File lib/impala/cursor.rb 98 def close 99 @open = false 100 @service.close(@handle) 101 end
# File lib/impala/cursor.rb 59 def columns 60 @columns ||= metadata.schema.fieldSchemas.map(&:name) 61 end
# File lib/impala/cursor.rb 67 def each 68 while row = fetch_row 69 yield row 70 end 71 end
# File lib/impala/cursor.rb 133 def exec_summary 134 @service.GetExecSummary(@handle) 135 end
Returns all the remaining rows in the result set. @return [Array<Hash>] the remaining rows in the result set @see fetch_one
# File lib/impala/cursor.rb 92 def fetch_all 93 self.to_a 94 end
Returns the next available row as a hash, or nil if there are none left. @return [Hash, nil] the next available row, or nil if there are none
left
@see fetch_all
# File lib/impala/cursor.rb 77 def fetch_row 78 if @row_buffer.empty? 79 if @done 80 return nil 81 else 82 fetch_more 83 end 84 end 85 86 @row_buffer.shift 87 end
Returns true if there are any more rows to fetch.
# File lib/impala/cursor.rb 125 def has_more? 126 !@done || !@row_buffer.empty? 127 end
# File lib/impala/cursor.rb 63 def inspect 64 "#<#{self.class}#{handle ? " QueryID: #{handle.id}" : ''}#{open? ? '' : ' (CLOSED)'}>" 65 end
Returns true if the cursor is still open.
# File lib/impala/cursor.rb 104 def open? 105 @open 106 end
Returns the progress for the query.
# File lib/impala/cursor.rb 138 def progress 139 summary = exec_summary 140 summary.progress.num_completed_scan_ranges.to_f / summary.progress.total_scan_ranges.to_f 141 end
Returns true if the query is done running, and results can be fetched.
# File lib/impala/cursor.rb 109 def query_done? 110 [ 111 Protocol::Beeswax::QueryState::EXCEPTION, 112 Protocol::Beeswax::QueryState::FINISHED 113 ].include?(@service.get_state(@handle)) 114 end
# File lib/impala/cursor.rb 129 def runtime_profile 130 @service.GetRuntimeProfile(@handle) 131 end
Blocks until the query done running.
# File lib/impala/cursor.rb 117 def wait! 118 until query_done? 119 periodic_callback 120 sleep @poll_every 121 end 122 end
Private Instance Methods
# File lib/impala/cursor.rb 160 def exceptional? 161 @service.get_state(@handle) == Protocol::Beeswax::QueryState::EXCEPTION 162 end
# File lib/impala/cursor.rb 164 def fetch_batch 165 raise CursorError.new("Cursor has expired or been closed") unless @open 166 raise ConnectionError.new("The query was aborted") if exceptional? 167 168 begin 169 res = @service.fetch(@handle, false, BUFFER_SIZE) 170 rescue Protocol::Beeswax::BeeswaxException 171 @open = false 172 raise CursorError.new("Cursor has expired or been closed") 173 end 174 175 rows = res.data.map { |raw| parse_row(raw) } 176 @row_buffer.concat(rows) 177 178 unless res.has_more 179 @done = true 180 close 181 end 182 end
# File lib/impala/cursor.rb 156 def fetch_more 157 fetch_batch until @done || @row_buffer.count >= BUFFER_SIZE 158 end
# File lib/impala/cursor.rb 152 def metadata 153 @metadata ||= @service.get_results_metadata(@handle) 154 end
# File lib/impala/cursor.rb 184 def parse_row(raw) 185 row = {} 186 fields = raw.split(metadata.delim) 187 188 row_convertor.each do |c, p, i| 189 v = fields[i] 190 row[c] = (p ? p.call(v) : v unless v == NULL) 191 end 192 193 row 194 end
# File lib/impala/cursor.rb 147 def periodic_callback 148 return unless progress_reporter.show? 149 progress_reporter.report 150 end
# File lib/impala/cursor.rb 196 def row_convertor 197 @row_convertor ||= columns.zip(metadata.schema.fieldSchemas.map{|s| typecast_map[s.type]}, (0...(columns.length)).to_a) 198 end