class Fluent::Plugin::PostgresBulkOutput
Constants
- N_PARAMETER_MAX
number of parameters must be between 0 and 65535
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_postgres_bulk.rb, line 26 def initialize super require 'pg' end
Public Instance Methods
client()
click to toggle source
# File lib/fluent/plugin/out_postgres_bulk.rb, line 31 def client PG.connect( host: @host, port: @port, dbname: @database, user: @username, password: @password, ) end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_postgres_bulk.rb, line 41 def multi_workers_ready? true end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_postgres_bulk.rb, line 45 def write(chunk) handler = client() values = build_values(chunk) max_slice = N_PARAMETER_MAX / @column_names.length * @column_names.length values.each_slice(max_slice) do |slice| place_holders = build_place_holders(slice) query = "INSERT INTO #{@table} (#{@column_names.join(',')}) VALUES #{place_holders}" t = Benchmark.realtime { handler.exec_params(query, slice) } log.info("(table: #{table}) inserted #{values.length / @column_names.length} records in #{(t * 1000).to_i} ms") end ensure handler&.close end
Private Instance Methods
build_place_holders(values)
click to toggle source
# File lib/fluent/plugin/out_postgres_bulk.rb, line 74 def build_place_holders(values) values.each_slice(@column_names.length) .map .with_index { |cols, i| params = cols.map.with_index { |c, j| "$#{i * cols.length + j + 1}" } "(#{params.join(',')})" }.join(',') end
build_values(chunk)
click to toggle source
# File lib/fluent/plugin/out_postgres_bulk.rb, line 63 def build_values(chunk) values = [] chunk.each { |time, record| v = @column_names.map { |k| record[k] } values.push(*v) } values end