class PgConduit::ParallelStreamReader

A multi threaded stream reader

Public Class Methods

new(query_stream, threads: 5, queue_max_size: 1000) click to toggle source

@param query_stream [PgConduit::QueryStream] @param threads [Integer] The number of threads to use for workers @param queue_max_size [Integer] How many rows should be stored in memory

in the work queue.
# File lib/pg_conduit/parallel_stream_reader.rb, line 8
def initialize(query_stream, threads: 5, queue_max_size: 1000)
  @queue = SizedQueue.new(queue_max_size)
  @workers = threads
  @stream = query_stream
end

Public Instance Methods

read(&callback) click to toggle source

Read A QueryStream and yield it's rows

@yield [Hash] A single row from the QueryStream. Every row from the stream

will be yielded but order is not guaranteed.
# File lib/pg_conduit/parallel_stream_reader.rb, line 18
def read(&callback)
  reader  = read_stream(@stream)
  workers = dispatch_workers(&callback)
  reader.join
  workers.each { |t| t.join }
  :ok
end

Private Instance Methods

dispatch_worker(&callback) click to toggle source
# File lib/pg_conduit/parallel_stream_reader.rb, line 39
def dispatch_worker(&callback)
  Thread.new do
    loop do
      continue = process_next(&callback)
      break if @queue.closed? && !continue
    end
  end
end
dispatch_workers(&callback) click to toggle source
# File lib/pg_conduit/parallel_stream_reader.rb, line 35
def dispatch_workers(&callback)
  (1..@workers).to_a.map { dispatch_worker(&callback) }
end
process_next(&callback) click to toggle source
# File lib/pg_conduit/parallel_stream_reader.rb, line 48
def process_next(&callback)
  continue = false
  Thread.new do
    row = @queue.deq
    if row
      callback.call row
      continue = true
    end
  end.join
  continue
end
read_stream(query_stream) click to toggle source
# File lib/pg_conduit/parallel_stream_reader.rb, line 28
def read_stream(query_stream)
  Thread.new do
    query_stream.each_row { |row| @queue << row }
    @queue.close
  end
end