class PgConduit::Pipe

Public Class Methods

new(from:, to:) click to toggle source

@example

Pipe
  .new(from: query_stream, to: db_writer)
  .send('SELECT name FROM users')
  .as do |user|
    %(INSERT INTO friends (name) VALUES ('#{user["full_name"]}'))
  end
# File lib/pg_conduit/pipe.rb, line 10
def initialize(from:, to:)
  @stream = from
  @writer = to
  @reader = ParallelStreamReader.new(@stream)
  @transformers = []
end

Public Instance Methods

exec()
Alias for: write
peak() { |row| ... } click to toggle source
# File lib/pg_conduit/pipe.rb, line 29
def peak
  self.tap { @transformers << ->(row) { row.tap { yield row } } }
end
read(query) click to toggle source
# File lib/pg_conduit/pipe.rb, line 17
def read(query)
  self.tap { @stream.query(query) }
end
transform(&transformer) click to toggle source
# File lib/pg_conduit/pipe.rb, line 21
def transform(&transformer)
  self.tap { @transformers << transformer }
end
write() click to toggle source
# File lib/pg_conduit/pipe.rb, line 25
def write
  exec_read { |row| exec_write { exec_transform(row) } }
end
Also aliased as: exec
write_batched(size: 1000) { |rows| ... } click to toggle source
# File lib/pg_conduit/pipe.rb, line 33
def write_batched(size: 1000)
  collector = RowCollector.new(chunk_size: size)

  # Set callback to yield collected rows
  collector.on_chunk { |rows| exec_write { yield rows } }

  # Process each row
  exec_read { |row| collector << exec_transform(row) }

  # Yield any remaining rows
  collector.finish
end

Private Instance Methods

exec_read(&b) click to toggle source
# File lib/pg_conduit/pipe.rb, line 50
def exec_read(&b)
  @reader.read(&b)
end
exec_transform(row) click to toggle source
# File lib/pg_conduit/pipe.rb, line 58
def exec_transform(row)
  @transformers.reduce(row) { |data, transform| transform.call data }
end
exec_write(&b) click to toggle source
# File lib/pg_conduit/pipe.rb, line 54
def exec_write(&b)
  @writer.write(&b)
end