class PostgreSQLCursor::Cursor
Attributes
Public Class Methods
Public: Start a new PostgreSQL cursor query sql - The SQL statement with interpolated values options - hash of processing controls
while: value - Exits loop when block does not return this value. until: value - Exits loop when block returns this value. fraction: 0.1..1.0 - The cursor_tuple_fraction (default 1.0) block_size: 1..n - The number of rows to fetch per db block fetch Defaults to 1000 with_hold - Allows the query to remain open across commit points.
Examples
PostgreSQLCursor::Cursor.new("select ....")
Returns the cursor object when called with new.
# File lib/postgresql_cursor/cursor.rb, line 44 def initialize(sql, options={}) @sql = sql @options = options @connection = @options.fetch(:connection) { ::ActiveRecord::Base.connection } @count = 0 @iterate = options[:instances] ? :each_instance : :each_row @batched = false end
Public Instance Methods
# File lib/postgresql_cursor/cursor.rb, line 222 def cast_types(row) row end
Public: Closes the cursor
# File lib/postgresql_cursor/cursor.rb, line 281 def close @connection.execute("close #{@cursor}") end
# File lib/postgresql_cursor/cursor.rb, line 226 def column_types return nil if ::ActiveRecord::VERSION::MAJOR < 4 return @column_types if @column_types types = {} fields = @result.fields fields.each_with_index do |fname, i| ftype = @result.ftype i fmod = @result.fmod i types[fname] = @connection.get_type_map.fetch(ftype, fmod) do |oid, mod| warn "unknown OID: #{fname}(#{oid}) (#{sql})" if ::ActiveRecord::VERSION::MAJOR <= 4 ::ActiveRecord::ConnectionAdapters::PostgreSQLAdapter::OID::Identity.new else ::ActiveRecord::Type::Value.new end end end @column_types = types end
Public: Yields each row of the result set to the passed block
Yields the row to the block. The row is a hash with symbolized keys.
{colname: value, ....}
Returns the count of rows processed
# File lib/postgresql_cursor/cursor.rb, line 77 def each(&block) if @iterate == :each_row @batched ? self.each_row_batch(&block) : self.each_row(&block) elsif @iterate == :each_array @batched ? self.each_array_batch(&block) : self.each_array(&block) else @batched ? self.each_instance_batch(@type, &block) : self.each_instance(@type, &block) end end
# File lib/postgresql_cursor/cursor.rb, line 94 def each_array(&block) old_iterate = @iterate @iterate = :each_array begin rv = self.each_tuple do |row| block.call(row) end ensure @iterate = old_iterate end rv end
# File lib/postgresql_cursor/cursor.rb, line 127 def each_array_batch(&block) old_iterate = @iterate @iterate = :each_array begin rv = self.each_batch do |batch| block.call(batch) end ensure @iterate = old_iterate end rv end
# File lib/postgresql_cursor/cursor.rb, line 107 def each_instance(klass=nil, &block) klass ||= @type self.each_tuple do |row| if ::ActiveRecord::VERSION::MAJOR < 4 model = klass.send(:instantiate,row) else @column_types ||= column_types model = klass.send(:instantiate, row, @column_types) end block.call(model) end end
# File lib/postgresql_cursor/cursor.rb, line 140 def each_instance_batch(klass=nil, &block) klass ||= @type self.each_batch do |batch| models = batch.map do |row| if ::ActiveRecord::VERSION::MAJOR < 4 model = klass.send(:instantiate, row) else @column_types ||= column_types model = klass.send(:instantiate, row, @column_types) end end block.call(models) end end
# File lib/postgresql_cursor/cursor.rb, line 87 def each_row(&block) self.each_tuple do |row| row = row.symbolize_keys if @options[:symbolize_keys] block.call(row) end end
# File lib/postgresql_cursor/cursor.rb, line 120 def each_row_batch(&block) self.each_batch do |batch| batch.map!(&:symbolize_keys) if @options[:symbolize_keys] block.call(batch) end end
Public: Returns the next row from the cursor, or empty hash if end of results
Returns a row as a hash of {'colname'=>value,…}
# File lib/postgresql_cursor/cursor.rb, line 260 def fetch(options={}) open unless @block fetch_block if @block.size==0 row = @block.shift row = row.symbolize_keys if row && options[:symbolize_keys] row end
Private: Fetches the next block of rows into @block
# File lib/postgresql_cursor/cursor.rb, line 269 def fetch_block(block_size=nil) block_size ||= @block_size ||= @options.fetch(:block_size) { 1000 } @result = @connection.execute("fetch #{block_size} from #{@cursor}") if @iterate == :each_array @block = @result.each_row.collect {|row| row } else @block = @result.collect {|row| row } end end
# File lib/postgresql_cursor/cursor.rb, line 66 def iterate_batched(batched=true) @batched = batched self end
Specify the type to instantiate, or reset to return a Hash
# File lib/postgresql_cursor/cursor.rb, line 54 def iterate_type(type=nil) if type.nil? || type == Hash @iterate = :each_row elsif type == Array @iterate = :each_array else @iterate = :each_instance @type = type end self end
Public: Opens (actually, “declares”) the cursor. Call this before fetching
# File lib/postgresql_cursor/cursor.rb, line 249 def open set_cursor_tuple_fraction @cursor = @options[:cursor_name] || ("cursor_" + SecureRandom.uuid.gsub("-","")) hold = @options[:with_hold] ? 'with hold ' : '' @result = @connection.execute("declare #{@cursor} no scroll cursor #{hold}for #{@sql}") @block = [] end
Returns an array of columns plucked from the result rows. Experimental function, as this could still use too much memory and negate the purpose of this libarary. Should this return a lazy enumerator instead?
# File lib/postgresql_cursor/cursor.rb, line 159 def pluck(*cols) options = cols.last.is_a?(Hash) ? cols.pop : {} @options.merge!(options) @options[:symbolize_keys] = true self.iterate_type(options[:class]) if options[:class] cols = cols.map {|c| c.to_sym } result = [] self.each() do |row| row = row.symbolize_keys if row.is_a?(Hash) result << cols.map { |c| row[c] } end result.flatten! if cols.size == 1 result end
Private: Sets the PostgreSQL cursor_tuple_fraction value = 1.0 to assume all rows will be fetched This is a value between 0.1 and 1.0 (PostgreSQL defaults to 0.1, this library defaults to 1.0) used to determine the expected fraction (percent) of result rows returned the the caller. This value determines the access path by the query planner.
# File lib/postgresql_cursor/cursor.rb, line 298 def set_cursor_tuple_fraction(frac=1.0) @cursor_tuple_fraction ||= @options.fetch(:fraction) { 1.0 } return @cursor_tuple_fraction if frac == @cursor_tuple_fraction @cursor_tuple_fraction = frac @result = @connection.execute("set cursor_tuple_fraction to #{frac}") frac end
Private: Open transaction unless with_hold option, specified
# File lib/postgresql_cursor/cursor.rb, line 286 def with_optional_transaction if @options[:with_hold] yield else @connection.transaction { yield } end end