class RedshiftConnector::DataFileBundleReader

Constants

DEFAULT_BATCH_SIZE
REPORT_SIZE

Attributes

batch_size[R]
bundle[R]
logger[R]

Public Class Methods

new(bundle, filter: nil, batch_size: DEFAULT_BATCH_SIZE, logger: RedshiftConnector.logger) click to toggle source
# File lib/redshift_connector/data_file_bundle_reader.rb, line 10
def initialize(bundle, filter: nil, batch_size: DEFAULT_BATCH_SIZE, logger: RedshiftConnector.logger)
  @bundle = bundle
  @filter = filter || lambda {|*row| row }
  @batch_size = batch_size || 1000
  @logger = logger
end

Public Instance Methods

all_data_objects() click to toggle source
# File lib/redshift_connector/data_file_bundle_reader.rb, line 38
def all_data_objects
  @bundle.data_files.select {|obj| obj.data_object? }
end
each(&block)
Alias for: each_row
each_batch(report: true) { |rows| ... } click to toggle source
# File lib/redshift_connector/data_file_bundle_reader.rb, line 44
def each_batch(report: true)
  n = 0
  reported = 0
  do_each_batch(@batch_size) do |rows|
    yield rows
    n += rows.size
    if n / REPORT_SIZE > reported
      @logger.info "#{n} rows processed" if report
      reported = n / REPORT_SIZE
    end
  end
  @logger.info "total #{n} rows processed" if report
end
each_object() { |obj| ... } click to toggle source
# File lib/redshift_connector/data_file_bundle_reader.rb, line 31
def each_object(&block)
  all_data_objects.each do |obj|
    @logger.info "processing s3 object: #{obj.key}"
    yield obj
  end
end
each_row(&block) click to toggle source
# File lib/redshift_connector/data_file_bundle_reader.rb, line 23
def each_row(&block)
  each_object do |obj|
    obj.each_row(&block)
  end
end
Also aliased as: each

Private Instance Methods

do_each_batch(batch_size) { |buf| ... } click to toggle source
# File lib/redshift_connector/data_file_bundle_reader.rb, line 58
def do_each_batch(batch_size)
  filter = @filter
  buf = []
  each_row do |row|
    buf.push filter.(*row)
    if buf.size == batch_size
      yield buf
      buf = []
    end
  end
  yield buf unless buf.empty?
end