class Tapsoob::DataStreamKeyed

Attributes

buffer[RW]

Public Class Methods

new(db, state, opts = {}) click to toggle source
Calls superclass method Tapsoob::DataStream::new
# File lib/tapsoob/data_stream.rb, line 305
def initialize(db, state, opts = {})
  super(db, state, opts)
  @state = { :primary_key => order_by(state[:table_name]).first, :filter => 0 }.merge(@state)
  @state[:chunksize] ||= DEFAULT_CHUNKSIZE
  @buffer = []
end

Public Instance Methods

buffer_limit() click to toggle source
# File lib/tapsoob/data_stream.rb, line 316
def buffer_limit
  if state[:last_fetched] and state[:last_fetched] < state[:filter] and self.buffer.size == 0
    state[:last_fetched]
  else
    state[:filter]
  end
end
calc_limit(chunksize) click to toggle source
# File lib/tapsoob/data_stream.rb, line 324
def calc_limit(chunksize)
  # we want to not fetch more than is needed while we're
  # inside sinatra but locally we can select more than
  # is strictly needed
  if defined?(Sinatra)
    (chunksize * 1.1).ceil
  else
    (chunksize * 3).ceil
  end
end
fetch_buffered(chunksize) click to toggle source
# File lib/tapsoob/data_stream.rb, line 359
def fetch_buffered(chunksize)
  load_buffer(chunksize) if self.buffer.size < chunksize
  rows = buffer.slice(0, chunksize)
  state[:last_fetched] = if rows.size > 0
    rows.last[ primary_key ]
  else
    nil
  end
  rows
end
increment(row_count) click to toggle source

def fetch_rows

chunksize = state[:chunksize]
Tapsoob::Utils.format_data(fetch_buffered(chunksize) || [],
  :string_columns => string_columns)

end

# File lib/tapsoob/data_stream.rb, line 380
def increment(row_count)
  # pop the rows we just successfully sent off the buffer
  @buffer.slice!(0, row_count)
end
load_buffer(chunksize) click to toggle source
# File lib/tapsoob/data_stream.rb, line 335
def load_buffer(chunksize)
  # make sure BasicObject is not polluted by subsequent requires
  Sequel::BasicObject.remove_methods!

  num = 0
  loop do
    limit = calc_limit(chunksize)
    # we have to use local variables in order for the virtual row filter to work correctly
    key = primary_key
    buf_limit = buffer_limit
    ds = table.order(*order_by).filter { key.sql_number > buf_limit }.limit(limit)
    log.debug "DataStreamKeyed#load_buffer SQL -> #{ds.sql}"
    data = ds.all
    self.buffer += data
    num += data.size
    if data.size > 0
      # keep a record of the last primary key value in the buffer
      state[:filter] = self.buffer.last[ primary_key ]
    end

    break if num >= chunksize or data.size == 0
  end
end
primary_key() click to toggle source
# File lib/tapsoob/data_stream.rb, line 312
def primary_key
  state[:primary_key].to_sym
end
verify_stream() click to toggle source
# File lib/tapsoob/data_stream.rb, line 385
def verify_stream
  key = primary_key
  ds = table.order(*order_by)
  current_filter = ds.max(key.sql_number)

  # set the current filter to the max of the primary key
  state[:filter] = current_filter
  # clear out the last_fetched value so it can restart from scratch
  state[:last_fetched] = nil

  log.debug "DataStreamKeyed#verify_stream -> state: #{state.inspect}"
end