class PostgreSQLCursor::Cursor

Attributes

connection[R]
count[R]
options[R]
result[R]
sql[R]

Public Class Methods

new(sql, options={}) click to toggle source

Public: Start a new PostgreSQL cursor query sql - The SQL statement with interpolated values options - hash of processing controls

while: value    - Exits loop when block does not return this value.
until: value    - Exits loop when block returns this value.
fraction: 0.1..1.0    - The cursor_tuple_fraction (default 1.0)
block_size: 1..n      - The number of rows to fetch per db block fetch
                        Defaults to 1000
with_hold       - Allows the query to remain open across commit points.

Examples

PostgreSQLCursor::Cursor.new("select ....")

Returns the cursor object when called with new.

# File lib/postgresql_cursor/cursor.rb, line 44
def initialize(sql, options={})
  @sql        = sql
  @options    = options
  @connection = @options.fetch(:connection) { ::ActiveRecord::Base.connection }
  @count      = 0
  @iterate    = options[:instances] ? :each_instance : :each_row
  @batched    = false
end

Public Instance Methods

cast_types(row) click to toggle source
# File lib/postgresql_cursor/cursor.rb, line 222
def cast_types(row)
  row
end
close() click to toggle source

Public: Closes the cursor

# File lib/postgresql_cursor/cursor.rb, line 281
def close
  @connection.execute("close #{@cursor}")
end
column_types() click to toggle source
# File lib/postgresql_cursor/cursor.rb, line 226
def column_types
  return nil if ::ActiveRecord::VERSION::MAJOR < 4
  return @column_types if @column_types

  types = {}
  fields = @result.fields
  fields.each_with_index do |fname, i|
    ftype = @result.ftype i
    fmod  = @result.fmod i
    types[fname] = @connection.get_type_map.fetch(ftype, fmod) do |oid, mod|
      warn "unknown OID: #{fname}(#{oid}) (#{sql})"
      if ::ActiveRecord::VERSION::MAJOR <= 4
        ::ActiveRecord::ConnectionAdapters::PostgreSQLAdapter::OID::Identity.new
      else
        ::ActiveRecord::Type::Value.new
      end
    end
  end

  @column_types = types
end
each(&block) click to toggle source

Public: Yields each row of the result set to the passed block

Yields the row to the block. The row is a hash with symbolized keys.

{colname: value, ....}

Returns the count of rows processed

# File lib/postgresql_cursor/cursor.rb, line 77
def each(&block)
  if @iterate == :each_row
    @batched ? self.each_row_batch(&block) : self.each_row(&block)
  elsif @iterate == :each_array
    @batched ? self.each_array_batch(&block) : self.each_array(&block)
  else
    @batched ? self.each_instance_batch(@type, &block) : self.each_instance(@type, &block)
  end
end
each_array(&block) click to toggle source
# File lib/postgresql_cursor/cursor.rb, line 94
def each_array(&block)
  old_iterate = @iterate
  @iterate = :each_array
  begin
    rv = self.each_tuple do |row|
      block.call(row)
    end
  ensure
    @iterate = old_iterate
  end
  rv
end
each_array_batch(&block) click to toggle source
# File lib/postgresql_cursor/cursor.rb, line 127
def each_array_batch(&block)
  old_iterate = @iterate
  @iterate = :each_array
  begin
    rv = self.each_batch do |batch|
      block.call(batch)
    end
  ensure
    @iterate = old_iterate
  end
  rv
end
each_instance(klass=nil, &block) click to toggle source
# File lib/postgresql_cursor/cursor.rb, line 107
def each_instance(klass=nil, &block)
  klass ||= @type
  self.each_tuple do |row|
    if ::ActiveRecord::VERSION::MAJOR < 4
      model = klass.send(:instantiate,row)
    else
      @column_types ||= column_types
      model = klass.send(:instantiate, row, @column_types)
    end
    block.call(model)
  end
end
each_instance_batch(klass=nil, &block) click to toggle source
# File lib/postgresql_cursor/cursor.rb, line 140
def each_instance_batch(klass=nil, &block)
  klass ||= @type
  self.each_batch do |batch|
    models = batch.map do |row|
      if ::ActiveRecord::VERSION::MAJOR < 4
        model = klass.send(:instantiate, row)
      else
        @column_types ||= column_types
        model = klass.send(:instantiate, row, @column_types)
      end
    end
    block.call(models)
  end
end
each_row(&block) click to toggle source
# File lib/postgresql_cursor/cursor.rb, line 87
def each_row(&block)
  self.each_tuple do |row|
    row = row.symbolize_keys if @options[:symbolize_keys]
    block.call(row)
  end
end
each_row_batch(&block) click to toggle source
# File lib/postgresql_cursor/cursor.rb, line 120
def each_row_batch(&block)
  self.each_batch do |batch|
    batch.map!(&:symbolize_keys) if @options[:symbolize_keys]
    block.call(batch)
  end
end
fetch(options={}) click to toggle source

Public: Returns the next row from the cursor, or empty hash if end of results

Returns a row as a hash of {'colname'=>value,…}

# File lib/postgresql_cursor/cursor.rb, line 260
def fetch(options={})
  open unless @block
  fetch_block if @block.size==0
  row = @block.shift
  row = row.symbolize_keys if row && options[:symbolize_keys]
  row
end
fetch_block(block_size=nil) click to toggle source

Private: Fetches the next block of rows into @block

# File lib/postgresql_cursor/cursor.rb, line 269
def fetch_block(block_size=nil)
  block_size ||= @block_size ||= @options.fetch(:block_size) { 1000 }
  @result = @connection.execute("fetch #{block_size} from #{@cursor}")

  if @iterate == :each_array
    @block = @result.each_row.collect {|row| row }
  else
    @block = @result.collect {|row| row }
  end
end
iterate_batched(batched=true) click to toggle source
# File lib/postgresql_cursor/cursor.rb, line 66
def iterate_batched(batched=true)
  @batched = batched
  self
end
iterate_type(type=nil) click to toggle source

Specify the type to instantiate, or reset to return a Hash

# File lib/postgresql_cursor/cursor.rb, line 54
def iterate_type(type=nil)
  if type.nil? || type == Hash
    @iterate = :each_row
  elsif type == Array
    @iterate = :each_array
  else
    @iterate = :each_instance
    @type    = type
  end
  self
end
open() click to toggle source

Public: Opens (actually, “declares”) the cursor. Call this before fetching

# File lib/postgresql_cursor/cursor.rb, line 249
def open
  set_cursor_tuple_fraction
  @cursor = @options[:cursor_name] || ("cursor_" + SecureRandom.uuid.gsub("-",""))
  hold = @options[:with_hold] ? 'with hold ' : ''
  @result = @connection.execute("declare #{@cursor} no scroll cursor #{hold}for #{@sql}")
  @block = []
end
pluck(*cols) click to toggle source

Returns an array of columns plucked from the result rows. Experimental function, as this could still use too much memory and negate the purpose of this libarary. Should this return a lazy enumerator instead?

# File lib/postgresql_cursor/cursor.rb, line 159
def pluck(*cols)
  options = cols.last.is_a?(Hash) ? cols.pop : {}
  @options.merge!(options)
  @options[:symbolize_keys] = true
  self.iterate_type(options[:class]) if options[:class]
  cols    = cols.map {|c| c.to_sym }
  result  = []

  self.each() do |row|
    row = row.symbolize_keys if row.is_a?(Hash)
    result << cols.map { |c| row[c] }
  end

  result.flatten! if cols.size == 1
  result
end
set_cursor_tuple_fraction(frac=1.0) click to toggle source

Private: Sets the PostgreSQL cursor_tuple_fraction value = 1.0 to assume all rows will be fetched This is a value between 0.1 and 1.0 (PostgreSQL defaults to 0.1, this library defaults to 1.0) used to determine the expected fraction (percent) of result rows returned the the caller. This value determines the access path by the query planner.

# File lib/postgresql_cursor/cursor.rb, line 298
def set_cursor_tuple_fraction(frac=1.0)
  @cursor_tuple_fraction ||= @options.fetch(:fraction) { 1.0 }
  return @cursor_tuple_fraction if frac == @cursor_tuple_fraction
  @cursor_tuple_fraction = frac
  @result = @connection.execute("set cursor_tuple_fraction to  #{frac}")
  frac
end
with_optional_transaction() { || ... } click to toggle source

Private: Open transaction unless with_hold option, specified

# File lib/postgresql_cursor/cursor.rb, line 286
def with_optional_transaction
  if @options[:with_hold]
    yield
  else
    @connection.transaction { yield }
  end
end