class PgConduit::RowCollector
A thread safe accumulator, used to chunk an input stream
Public Class Methods
new(chunk_size: 100)
click to toggle source
@param chunk_size [Integer] How many rows should be collected before
yielding
# File lib/pg_conduit/row_collector.rb, line 6 def initialize(chunk_size: 100) @chunk_size = chunk_size @rows = [] @finished = false @mutex = Mutex.new end
Public Instance Methods
<<(row)
click to toggle source
@param row [Object] Row to add to the buffer
# File lib/pg_conduit/row_collector.rb, line 36 def <<(row) @mutex.synchronize do if @finished raise 'Data may not be added to a row collector that has been marked as finished' end @rows << row if @rows.length % @chunk_size == 0 flush(&@callback) end end end
finish()
click to toggle source
Flushes any collected rows, yielding them to the callback and marks the collector as finished. Any subsequent calls to :<< will raise an error.
# File lib/pg_conduit/row_collector.rb, line 51 def finish @mutex.synchronize do flush(&@callback) @finished = true end end
on_chunk(&callback)
click to toggle source
Provide a block to be called with each accumulated chunk
@yield [Array] collected rows @return [self]
@example Print once every ten rows
collector = RowCollector.new(chunk_size: 10) collector.on_chunk { |rows| puts rows } 100.times { |n| collector << n } #> [0,1,2,3,4,5,6,7,8,9] #> [10,11,12,13,14,15,16,17,18,19] #> ...etc
# File lib/pg_conduit/row_collector.rb, line 29 def on_chunk(&callback) self.tap do @mutex.synchronize { @callback = callback } end end
Private Instance Methods
flush() { |rows| ... }
click to toggle source
Yields the collected rows and resets the row collector @yield [Array<Hash>] The collected rows
# File lib/pg_conduit/row_collector.rb, line 62 def flush yield @rows if @rows.length > 0 @rows = [] true end