class PgConduit::ParallelStreamReader
A multi threaded stream reader
Public Class Methods
new(query_stream, threads: 5, queue_max_size: 1000)
click to toggle source
@param query_stream [PgConduit::QueryStream] @param threads [Integer] The number of threads to use for workers @param queue_max_size [Integer] How many rows should be stored in memory
in the work queue.
# File lib/pg_conduit/parallel_stream_reader.rb, line 8 def initialize(query_stream, threads: 5, queue_max_size: 1000) @queue = SizedQueue.new(queue_max_size) @workers = threads @stream = query_stream end
Public Instance Methods
read(&callback)
click to toggle source
Read A QueryStream
and yield it's rows
@yield [Hash] A single row from the QueryStream
. Every row from the stream
will be yielded but order is not guaranteed.
# File lib/pg_conduit/parallel_stream_reader.rb, line 18 def read(&callback) reader = read_stream(@stream) workers = dispatch_workers(&callback) reader.join workers.each { |t| t.join } :ok end
Private Instance Methods
dispatch_worker(&callback)
click to toggle source
# File lib/pg_conduit/parallel_stream_reader.rb, line 39 def dispatch_worker(&callback) Thread.new do loop do continue = process_next(&callback) break if @queue.closed? && !continue end end end
dispatch_workers(&callback)
click to toggle source
# File lib/pg_conduit/parallel_stream_reader.rb, line 35 def dispatch_workers(&callback) (1..@workers).to_a.map { dispatch_worker(&callback) } end
process_next(&callback)
click to toggle source
# File lib/pg_conduit/parallel_stream_reader.rb, line 48 def process_next(&callback) continue = false Thread.new do row = @queue.deq if row callback.call row continue = true end end.join continue end
read_stream(query_stream)
click to toggle source
# File lib/pg_conduit/parallel_stream_reader.rb, line 28 def read_stream(query_stream) Thread.new do query_stream.each_row { |row| @queue << row } @queue.close end end