class Fluent::MysqlPreparedStatementOutput

Attributes

handler[RW]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mysql_prepared_statement.rb, line 22
def initialize
  super
  require 'mysql2-cs-bind'
end

Public Instance Methods

client() click to toggle source
# File lib/fluent/plugin/out_mysql_prepared_statement.rb, line 62
def client
  Mysql2::Client.new({
      :host => @host,
      :port => @port,
      :username => @username,
      :password => @password,
      :database => @database,
      :flags => Mysql2::Client::MULTI_STATEMENTS
    })
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mysql_prepared_statement.rb, line 27
def configure(conf)
  super

  if @sql.nil?
    raise Fluent::ConfigError, "sql MUST be specified, but missing"
  end

  if @key_names.nil?
    raise Fluent::ConfigError, "key_names MUST be specified, but missing"
  end

  @key_names = @key_names.split(',')
  @format_proc = Proc.new{|tag, time, record| @key_names.map{|k| record[k]}}

  begin
    Mysql2::Client.pseudo_bind(@sql, @key_names.map{|n| nil})
  rescue ArgumentError => e
    raise Fluent::ConfigError, "mismatch between sql placeholders and key_names"
  end

  $log.info "sql ->[#{@sql}]"
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_mysql_prepared_statement.rb, line 58
def format(tag, time, record)
  [tag, time, @format_proc.call(tag, time, record)].to_msgpack
end
get_exec_result(data) click to toggle source
# File lib/fluent/plugin/out_mysql_prepared_statement.rb, line 85
def get_exec_result(data)
  results = Array.new
  stmt = @handler.xquery(@sql, data)
  return results if stmt.nil?
  stmt.each do |row|
    results.push(row)
  end
  return results
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mysql_prepared_statement.rb, line 54
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mysql_prepared_statement.rb, line 50
def start
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_mysql_prepared_statement.rb, line 73
def write(chunk)
  @handler = client
  $log.info "adding mysql_query job: "
  chunk.msgpack_each { |tag, time, data|
    results = get_exec_result(data)
    results.each{|result|
      router.emit(@output_tag, Fluent::Engine.now, result)
    }
  }
  @handler.close
end