class PgConduit::RowCollector

A thread safe accumulator, used to chunk an input stream

Public Class Methods

new(chunk_size: 100) click to toggle source

@param chunk_size [Integer] How many rows should be collected before

yielding
# File lib/pg_conduit/row_collector.rb, line 6
def initialize(chunk_size: 100)
  @chunk_size = chunk_size
  @rows = []
  @finished = false
  @mutex = Mutex.new
end

Public Instance Methods

<<(row) click to toggle source

@param row [Object] Row to add to the buffer

# File lib/pg_conduit/row_collector.rb, line 36
def <<(row)
  @mutex.synchronize do
    if @finished
      raise 'Data may not be added to a row collector that has been marked as finished'
    end

    @rows << row
    if @rows.length % @chunk_size == 0
      flush(&@callback)
    end
  end
end
finish() click to toggle source

Flushes any collected rows, yielding them to the callback and marks the collector as finished. Any subsequent calls to :<< will raise an error.

# File lib/pg_conduit/row_collector.rb, line 51
def finish
  @mutex.synchronize do
    flush(&@callback)
    @finished = true
  end
end
on_chunk(&callback) click to toggle source

Provide a block to be called with each accumulated chunk

@yield [Array] collected rows @return [self]

@example Print once every ten rows

collector = RowCollector.new(chunk_size: 10)
collector.on_chunk { |rows| puts rows }

100.times { |n| collector << n }

#> [0,1,2,3,4,5,6,7,8,9]
#> [10,11,12,13,14,15,16,17,18,19]
#> ...etc
# File lib/pg_conduit/row_collector.rb, line 29
def on_chunk(&callback)
  self.tap do
    @mutex.synchronize { @callback = callback }
  end
end

Private Instance Methods

flush() { |rows| ... } click to toggle source

Yields the collected rows and resets the row collector @yield [Array<Hash>] The collected rows

# File lib/pg_conduit/row_collector.rb, line 62
def flush
  yield @rows if @rows.length > 0
  @rows = []
  true
end