# File lib/tapsoob/data_stream.rb, line 312 def primary_key state[:primary_key].to_sym end
class Tapsoob::DataStreamKeyed
Attributes
buffer[RW]
Public Class Methods
new(db, state, opts = {})
click to toggle source
Calls superclass method
Tapsoob::DataStream::new
# File lib/tapsoob/data_stream.rb, line 305 def initialize(db, state, opts = {}) super(db, state, opts) @state = { :primary_key => order_by(state[:table_name]).first, :filter => 0 }.merge(@state) @state[:chunksize] ||= DEFAULT_CHUNKSIZE @buffer = [] end
Public Instance Methods
buffer_limit()
click to toggle source
# File lib/tapsoob/data_stream.rb, line 316 def buffer_limit if state[:last_fetched] and state[:last_fetched] < state[:filter] and self.buffer.size == 0 state[:last_fetched] else state[:filter] end end
calc_limit(chunksize)
click to toggle source
# File lib/tapsoob/data_stream.rb, line 324 def calc_limit(chunksize) # we want to not fetch more than is needed while we're # inside sinatra but locally we can select more than # is strictly needed if defined?(Sinatra) (chunksize * 1.1).ceil else (chunksize * 3).ceil end end
fetch_buffered(chunksize)
click to toggle source
# File lib/tapsoob/data_stream.rb, line 359 def fetch_buffered(chunksize) load_buffer(chunksize) if self.buffer.size < chunksize rows = buffer.slice(0, chunksize) state[:last_fetched] = if rows.size > 0 rows.last[ primary_key ] else nil end rows end
increment(row_count)
click to toggle source
def fetch_rows
chunksize = state[:chunksize] Tapsoob::Utils.format_data(fetch_buffered(chunksize) || [], :string_columns => string_columns)
end
# File lib/tapsoob/data_stream.rb, line 380 def increment(row_count) # pop the rows we just successfully sent off the buffer @buffer.slice!(0, row_count) end
load_buffer(chunksize)
click to toggle source
# File lib/tapsoob/data_stream.rb, line 335 def load_buffer(chunksize) # make sure BasicObject is not polluted by subsequent requires Sequel::BasicObject.remove_methods! num = 0 loop do limit = calc_limit(chunksize) # we have to use local variables in order for the virtual row filter to work correctly key = primary_key buf_limit = buffer_limit ds = table.order(*order_by).filter { key.sql_number > buf_limit }.limit(limit) log.debug "DataStreamKeyed#load_buffer SQL -> #{ds.sql}" data = ds.all self.buffer += data num += data.size if data.size > 0 # keep a record of the last primary key value in the buffer state[:filter] = self.buffer.last[ primary_key ] end break if num >= chunksize or data.size == 0 end end
primary_key()
click to toggle source
verify_stream()
click to toggle source
# File lib/tapsoob/data_stream.rb, line 385 def verify_stream key = primary_key ds = table.order(*order_by) current_filter = ds.max(key.sql_number) # set the current filter to the max of the primary key state[:filter] = current_filter # clear out the last_fetched value so it can restart from scratch state[:last_fetched] = nil log.debug "DataStreamKeyed#verify_stream -> state: #{state.inspect}" end