class RedshiftConnector::DataFileBundleReader
Constants
- DEFAULT_BATCH_SIZE
- REPORT_SIZE
Attributes
batch_size[R]
bundle[R]
logger[R]
Public Class Methods
new(bundle, filter: nil, batch_size: DEFAULT_BATCH_SIZE, logger: RedshiftConnector.logger)
click to toggle source
# File lib/redshift_connector/data_file_bundle_reader.rb, line 10 def initialize(bundle, filter: nil, batch_size: DEFAULT_BATCH_SIZE, logger: RedshiftConnector.logger) @bundle = bundle @filter = filter || lambda {|*row| row } @batch_size = batch_size || 1000 @logger = logger end
Public Instance Methods
all_data_objects()
click to toggle source
# File lib/redshift_connector/data_file_bundle_reader.rb, line 38 def all_data_objects @bundle.data_files.select {|obj| obj.data_object? } end
each_batch(report: true) { |rows| ... }
click to toggle source
# File lib/redshift_connector/data_file_bundle_reader.rb, line 44 def each_batch(report: true) n = 0 reported = 0 do_each_batch(@batch_size) do |rows| yield rows n += rows.size if n / REPORT_SIZE > reported @logger.info "#{n} rows processed" if report reported = n / REPORT_SIZE end end @logger.info "total #{n} rows processed" if report end
each_object() { |obj| ... }
click to toggle source
# File lib/redshift_connector/data_file_bundle_reader.rb, line 31 def each_object(&block) all_data_objects.each do |obj| @logger.info "processing s3 object: #{obj.key}" yield obj end end
each_row(&block)
click to toggle source
# File lib/redshift_connector/data_file_bundle_reader.rb, line 23 def each_row(&block) each_object do |obj| obj.each_row(&block) end end
Also aliased as: each
Private Instance Methods
do_each_batch(batch_size) { |buf| ... }
click to toggle source
# File lib/redshift_connector/data_file_bundle_reader.rb, line 58 def do_each_batch(batch_size) filter = @filter buf = [] each_row do |row| buf.push filter.(*row) if buf.size == batch_size yield buf buf = [] end end yield buf unless buf.empty? end