class Fluent::MysqlOutput

Attributes

handler[RW]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mysql.rb, line 28
def initialize
  super
  require 'mysql2-cs-bind'
  require 'jsonpath'
end

Public Instance Methods

client() click to toggle source
# File lib/fluent/plugin/out_mysql.rb, line 104
def client
  Mysql2::Client.new({
      :host => @host, :port => @port,
      :username => @username, :password => @password,
      :database => @database,
      :sslkey => @sslkey,
      :sslcert => @sslcert,
      :sslca => @sslca,
      :sslcapath => @sslcapath,
      :sslcipher => @sslcipher,
      :sslverify => @sslverify,
      :flags => Mysql2::Client::MULTI_STATEMENTS,
    })
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mysql.rb, line 39
def configure(conf)
  super

  log.warn "[mysql] This plugin deprecated. You should use mysql_bulk."

  # TODO tag_mapped

  case @format
  when 'json'
    @format_proc = Proc.new{|tag, time, record| record.to_json}
  when 'jsonpath'
    @key_names = @key_names.split(/\s*,\s*/)
    @format_proc = Proc.new do |tag, time, record|
      json = record.to_json
      @key_names.map do |k|
        JsonPath.new(k.strip).on(json).first
      end
    end
  else
    @key_names = @key_names.split(/\s*,\s*/)
    @format_proc = Proc.new{|tag, time, record| @key_names.map{|k| record[k]}}
  end

  if @columns.nil? and @sql.nil?
    raise Fluent::ConfigError, "columns or sql MUST be specified, but missing"
  end
  if @columns and @sql
    raise Fluent::ConfigError, "both of columns and sql are specified, but specify one of them"
  end

  if @sql
    begin
      if @format == 'json'
        Mysql2::Client.pseudo_bind(@sql, [nil])
      else
        Mysql2::Client.pseudo_bind(@sql, @key_names.map{|n| nil})
      end
    rescue ArgumentError => e
      raise Fluent::ConfigError, "mismatch between sql placeholders and key_names"
    end
  else # columns
    raise Fluent::ConfigError, "table missing" unless @table
    @columns = @columns.split(/\s*,\s*/)
    cols = @columns.join(',')
    placeholders = if @format == 'json'
                     '?'
                   else
                     @key_names.map{|k| '?'}.join(',')
                   end
    @sql = "INSERT INTO #{@table} (#{cols}) VALUES (#{placeholders})"
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_mysql.rb, line 100
def format(tag, time, record)
  [tag, time, @format_proc.call(tag, time, record)].to_msgpack
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mysql.rb, line 96
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mysql.rb, line 92
def start
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_mysql.rb, line 119
def write(chunk)
  handler = self.client
  chunk.msgpack_each { |tag, time, data|
    handler.xquery(@sql, data)
  }
  handler.close
end