class PgConduit::Pipe
Public Class Methods
new(from:, to:)
click to toggle source
@example
Pipe .new(from: query_stream, to: db_writer) .send('SELECT name FROM users') .as do |user| %(INSERT INTO friends (name) VALUES ('#{user["full_name"]}')) end
# File lib/pg_conduit/pipe.rb, line 10 def initialize(from:, to:) @stream = from @writer = to @reader = ParallelStreamReader.new(@stream) @transformers = [] end
Public Instance Methods
peak() { |row| ... }
click to toggle source
# File lib/pg_conduit/pipe.rb, line 29 def peak self.tap { @transformers << ->(row) { row.tap { yield row } } } end
read(query)
click to toggle source
# File lib/pg_conduit/pipe.rb, line 17 def read(query) self.tap { @stream.query(query) } end
transform(&transformer)
click to toggle source
# File lib/pg_conduit/pipe.rb, line 21 def transform(&transformer) self.tap { @transformers << transformer } end
write()
click to toggle source
# File lib/pg_conduit/pipe.rb, line 25 def write exec_read { |row| exec_write { exec_transform(row) } } end
Also aliased as: exec
write_batched(size: 1000) { |rows| ... }
click to toggle source
# File lib/pg_conduit/pipe.rb, line 33 def write_batched(size: 1000) collector = RowCollector.new(chunk_size: size) # Set callback to yield collected rows collector.on_chunk { |rows| exec_write { yield rows } } # Process each row exec_read { |row| collector << exec_transform(row) } # Yield any remaining rows collector.finish end
Private Instance Methods
exec_read(&b)
click to toggle source
# File lib/pg_conduit/pipe.rb, line 50 def exec_read(&b) @reader.read(&b) end
exec_transform(row)
click to toggle source
# File lib/pg_conduit/pipe.rb, line 58 def exec_transform(row) @transformers.reduce(row) { |data, transform| transform.call data } end
exec_write(&b)
click to toggle source
# File lib/pg_conduit/pipe.rb, line 54 def exec_write(&b) @writer.write(&b) end