class RocketJob::Sliced::Writer::Input

Internal class for uploading records into input slices

Attributes

record_count[R]

Public Class Methods

collect(data_store, **args) { |writer| ... } click to toggle source

Batch collection of lines into slices.

Parameters

on_first: [Proc]
  Block to call on the first line only, instead of storing in the slice.
  Useful for extracting the header row
  Default: nil

slice_size: [Integer]
  Override the slice size when uploading for example ranges, where slice is the size
  of the range itself.

slice_batch_size: [Integer]
  The number of slices to batch up and to bulk load.
  For smaller slices this significantly improves upload performance.
  Note: If `slice_batch_size` is too high, it can exceed the maximum BSON block size.
# File lib/rocket_job/sliced/writer/input.rb, line 24
def self.collect(data_store, **args)
  writer = new(data_store, **args)
  yield(writer)
  writer.record_count
ensure
  writer&.flush
end
new(data_store, on_first: nil, slice_size: nil, slice_batch_size: nil) click to toggle source
# File lib/rocket_job/sliced/writer/input.rb, line 32
def initialize(data_store, on_first: nil, slice_size: nil, slice_batch_size: nil)
  @on_first         = on_first
  @record_count     = 0
  @data_store       = data_store
  @slice_size       = slice_size || @data_store.slice_size
  @slice_batch_size = slice_batch_size || 20
  @batch            = []
  @batch_count      = 0
  new_slice
end

Public Instance Methods

<<(line) click to toggle source
# File lib/rocket_job/sliced/writer/input.rb, line 43
def <<(line)
  if @on_first
    @on_first.call(line)
    @on_first = nil
    return self
  end
  @slice << line
  @record_count += 1
  if @slice.size >= @slice_size
    save_slice
    new_slice
  end
  self
end
flush() click to toggle source
# File lib/rocket_job/sliced/writer/input.rb, line 58
def flush
  if @slice_batch_size
    @batch << @slice if @slice.size.positive?
    @data_store.insert_many(@batch)
    @batch       = []
    @batch_count = 0
  elsif @slice.size.positive?
    @data_store.insert(@slice)
  end
end
new_slice() click to toggle source
# File lib/rocket_job/sliced/writer/input.rb, line 69
def new_slice
  @slice = @data_store.new(first_record_number: @record_count + 1)
end
save_slice() click to toggle source
# File lib/rocket_job/sliced/writer/input.rb, line 73
def save_slice
  return flush unless @slice_batch_size

  @batch_count += 1
  return flush if @batch_count >= @slice_batch_size

  @batch << @slice
end