class Impala::Cursor
Cursors are used to iterate over result sets without loading them all into memory at once. This can be useful if you're dealing with lots of rows. It implements Enumerable, so you can use each/select/map/etc.
Constants
- BUFFER_SIZE
- NULL
- TYPECAST_MAP
Attributes
Public Class Methods
# File lib/impala/cursor.rb 46 def initialize(handle, service, options = {}) 47 @handle = handle 48 @service = service 49 50 51 @row_buffer = [] 52 @done = false 53 @open = true 54 @typecast_map = TYPECAST_MAP.dup 55 @options = options.dup 56 @progress_reporter = ProgressReporter.new(self, @options) 57 @poll_every = options.fetch(:poll_every, 0.1) 58 end
# File lib/impala/cursor.rb 9 def self.typecast_boolean(value) 10 value == 'true' 11 end
# File lib/impala/cursor.rb 21 def self.typecast_decimal(value) 22 BigDecimal.new(value) 23 end
# File lib/impala/cursor.rb 17 def self.typecast_float(value) 18 value.to_f 19 end
# File lib/impala/cursor.rb 13 def self.typecast_int(value) 14 value.to_i 15 end
# File lib/impala/cursor.rb 25 def self.typecast_timestamp(value) 26 Time.parse(value) 27 end
Public Instance Methods
Close the cursor on the remote server. Once a cursor is closed, you can no longer fetch any rows from it.
# File lib/impala/cursor.rb 99 def close 100 @open = false 101 @service.close(@handle) 102 end
# File lib/impala/cursor.rb 60 def columns 61 @columns ||= metadata.schema.fieldSchemas.map(&:name) 62 end
# File lib/impala/cursor.rb 68 def each 69 while row = fetch_row 70 yield row 71 end 72 end
# File lib/impala/cursor.rb 134 def exec_summary 135 @service.GetExecSummary(@handle) 136 end
Returns all the remaining rows in the result set. @return [Array<Hash>] the remaining rows in the result set @see fetch_one
# File lib/impala/cursor.rb 93 def fetch_all 94 self.to_a 95 end
Returns the next available row as a hash, or nil if there are none left. @return [Hash, nil] the next available row, or nil if there are none
left
@see fetch_all
# File lib/impala/cursor.rb 78 def fetch_row 79 if @row_buffer.empty? 80 if @done 81 return nil 82 else 83 fetch_more 84 end 85 end 86 87 @row_buffer.shift 88 end
Returns true if there are any more rows to fetch.
# File lib/impala/cursor.rb 126 def has_more? 127 !@done || !@row_buffer.empty? 128 end
# File lib/impala/cursor.rb 64 def inspect 65 "#<#{self.class}#{open? ? '' : ' (CLOSED)'}>" 66 end
Returns true if the cursor is still open.
# File lib/impala/cursor.rb 105 def open? 106 @open 107 end
Returns the progress for the query.
# File lib/impala/cursor.rb 139 def progress 140 summary = exec_summary 141 summary.progress.num_completed_scan_ranges.to_f / summary.progress.total_scan_ranges.to_f 142 end
Returns true if the query is done running, and results can be fetched.
# File lib/impala/cursor.rb 110 def query_done? 111 [ 112 Protocol::Beeswax::QueryState::EXCEPTION, 113 Protocol::Beeswax::QueryState::FINISHED 114 ].include?(@service.get_state(@handle)) 115 end
# File lib/impala/cursor.rb 130 def runtime_profile 131 @service.GetRuntimeProfile(@handle) 132 end
Blocks until the query done running.
# File lib/impala/cursor.rb 118 def wait! 119 until query_done? 120 periodic_callback 121 sleep @poll_every 122 end 123 end
Private Instance Methods
# File lib/impala/cursor.rb 161 def exceptional? 162 @service.get_state(@handle) == Protocol::Beeswax::QueryState::EXCEPTION 163 end
# File lib/impala/cursor.rb 165 def fetch_batch 166 raise CursorError.new("Cursor has expired or been closed") unless @open 167 raise ConnectionError.new("The query was aborted") if exceptional? 168 169 begin 170 res = @service.fetch(@handle, false, BUFFER_SIZE) 171 rescue Protocol::Beeswax::BeeswaxException 172 @open = false 173 raise CursorError.new("Cursor has expired or been closed") 174 end 175 176 rows = res.data.map { |raw| parse_row(raw) } 177 @row_buffer.concat(rows) 178 179 unless res.has_more 180 @done = true 181 close 182 end 183 end
# File lib/impala/cursor.rb 157 def fetch_more 158 fetch_batch until @done || @row_buffer.count >= BUFFER_SIZE 159 end
# File lib/impala/cursor.rb 153 def metadata 154 @metadata ||= @service.get_results_metadata(@handle) 155 end
# File lib/impala/cursor.rb 185 def parse_row(raw) 186 row = {} 187 fields = raw.split(metadata.delim) 188 189 row_convertor.each do |c, p, i| 190 v = fields[i] 191 row[c] = (p ? p.call(v) : v unless v == NULL) 192 end 193 194 row 195 end
# File lib/impala/cursor.rb 148 def periodic_callback 149 return unless progress_reporter.show? 150 progress_reporter.report 151 end
# File lib/impala/cursor.rb 197 def row_convertor 198 @row_convertor ||= columns.zip(metadata.schema.fieldSchemas.map{|s| typecast_map[s.type]}, (0...(columns.length)).to_a) 199 end