class Fluent::Plugin::PostgresOutput

Attributes

handler[RW]

Public Instance Methods

client() click to toggle source
# File lib/fluent/plugin/out_postgres.rb, line 65
def client
  PG::Connection.new({
    :host => @host, :port => @port,
    :user => @username, :password => @password,
    :dbname => @database
  })
end
configure(conf) click to toggle source

We don't currently support mysql's analogous json format

Calls superclass method
# File lib/fluent/plugin/out_postgres.rb, line 25
def configure(conf)
  compat_parameters_convert(conf, :inject)
  super

  if @format == 'json'
    @format_proc = Proc.new{|tag, time, record| record.to_json}
  else
    @key_names = @key_names.split(/\s*,\s*/)
    @format_proc = Proc.new{|tag, time, record| @key_names.map{|k| record[k]}}
  end

  if @columns.nil? and @sql.nil?
    raise Fluent::ConfigError, "columns or sql MUST be specified, but missing"
  end
  if @columns and @sql
    raise Fluent::ConfigError, "both of columns and sql are specified, but specify one of them"
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_postgres.rb, line 52
def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)
  [tag, time, @format_proc.call(tag, time, record)].to_msgpack
end
formatted_to_msgpack_binary?() click to toggle source
# File lib/fluent/plugin/out_postgres.rb, line 61
def formatted_to_msgpack_binary?
  true
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_postgres.rb, line 57
def multi_workers_ready?
  true
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_postgres.rb, line 48
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_postgres.rb, line 44
def start
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_postgres.rb, line 73
def write(chunk)
  handler = self.client
  handler.prepare("write", @sql)
  chunk.msgpack_each { |tag, time, data|
    handler.exec_prepared("write", data)
  }
  handler.close
end