class Fluent::Plugin::PostgresBulkOutput

Constants

N_PARAMETER_MAX

number of parameters must be between 0 and 65535

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_postgres_bulk.rb, line 26
def initialize
  super
  require 'pg'
end

Public Instance Methods

client() click to toggle source
# File lib/fluent/plugin/out_postgres_bulk.rb, line 31
def client
  PG.connect(
    host: @host,
    port: @port,
    dbname: @database,
    user: @username,
    password: @password,
  )
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_postgres_bulk.rb, line 41
def multi_workers_ready?
  true
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_postgres_bulk.rb, line 45
def write(chunk)
  handler = client()
  values = build_values(chunk)
  max_slice = N_PARAMETER_MAX / @column_names.length * @column_names.length
  values.each_slice(max_slice) do |slice|
    place_holders = build_place_holders(slice)
    query = "INSERT INTO #{@table} (#{@column_names.join(',')}) VALUES #{place_holders}"
    t = Benchmark.realtime {
      handler.exec_params(query, slice)
    }
    log.info("(table: #{table}) inserted #{values.length / @column_names.length} records in #{(t * 1000).to_i} ms")
  end
ensure
  handler&.close
end

Private Instance Methods

build_place_holders(values) click to toggle source
# File lib/fluent/plugin/out_postgres_bulk.rb, line 74
def build_place_holders(values)
  values.each_slice(@column_names.length)
        .map
        .with_index { |cols, i|
          params = cols.map.with_index { |c, j|
            "$#{i * cols.length + j + 1}"
          }
          "(#{params.join(',')})"
        }.join(',')
end
build_values(chunk) click to toggle source
# File lib/fluent/plugin/out_postgres_bulk.rb, line 63
def build_values(chunk)
  values = []
  chunk.each { |time, record|
    v = @column_names.map { |k|
      record[k]
    }
    values.push(*v)
  }
  values
end