class Fluent::Plugin::MysqlBulkOutput

Attributes

handler[RW]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mysql_bulk.rb, line 59
def initialize
  super
  require 'mysql2-cs-bind'
end

Public Instance Methods

check_table_schema(database: @database, table: @table) click to toggle source
# File lib/fluent/plugin/out_mysql_bulk.rb, line 107
def check_table_schema(database: @database, table: @table)
  _client = client(database)
  result = _client.xquery("SHOW COLUMNS FROM #{table}")
  max_lengths = []
  @column_names.each do |column|
    info = result.select { |x| x['Field'] == column }.first
    r = /(char|varchar)\(([\d]+)\)/
    begin
      max_length = info['Type'].scan(r)[0][1].to_i
    rescue
      max_length = nil
    end
    max_lengths << max_length
  end
  max_lengths
ensure
  if not _client.nil? then _client.close end
end
client(database) click to toggle source
# File lib/fluent/plugin/out_mysql_bulk.rb, line 139
def client(database)
  Mysql2::Client.new(
      host: @host,
      port: @port,
      username: @username,
      password: @password,
      database: database,
      sslkey: @sslkey,
      sslcert: @sslcert,
      sslca: @sslca,
      sslcapath: @sslcapath,
      sslcipher: @sslcipher,
      sslverify: @sslverify,
      flags: Mysql2::Client::MULTI_STATEMENTS
    )
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mysql_bulk.rb, line 64
    def configure(conf)
      compat_parameters_convert(conf, :buffer, :inject)
      super

      if @column_names.nil?
        fail Fluent::ConfigError, 'column_names MUST specified, but missing'
      end

      if @on_duplicate_key_update
        if @on_duplicate_update_keys.nil?
          fail Fluent::ConfigError, 'on_duplicate_key_update = true , on_duplicate_update_keys nil!'
        end
        @on_duplicate_update_keys = @on_duplicate_update_keys.split(',')

        if !@on_duplicate_update_custom_values.nil?
          @on_duplicate_update_custom_values = @on_duplicate_update_custom_values.split(',')
          if @on_duplicate_update_custom_values.length != @on_duplicate_update_keys.length
            fail Fluent::ConfigError, <<-DESC
on_duplicate_update_keys and on_duplicate_update_custom_values must be the same length
DESC
          end
        end

        @on_duplicate_key_update_sql = ' ON DUPLICATE KEY UPDATE '
        updates = []
        @on_duplicate_update_keys.each_with_index do |update_column, i|
          if @on_duplicate_update_custom_values.nil? || @on_duplicate_update_custom_values[i] == "#{update_column}"
            updates << "#{update_column} = VALUES(#{update_column})"
          else
            value = @on_duplicate_update_custom_values[i].to_s.match(/\${(.*)}/)[1]
            escape_value = Mysql2::Client.escape(value)
            updates << "#{update_column} = #{escape_value}"
          end
        end
        @on_duplicate_key_update_sql += updates.join(',')
      end

      @column_names = @column_names.split(',').collect(&:strip)
      @key_names = @key_names.nil? ? @column_names : @key_names.split(',').collect(&:strip)
      @json_key_names = @json_key_names.split(',') if @json_key_names
      @unixtimestamp_key_names = @unixtimestamp_key_names.split(',') if @unixtimestamp_key_names
    end
expand_placeholders(metadata) click to toggle source
# File lib/fluent/plugin/out_mysql_bulk.rb, line 156
def expand_placeholders(metadata)
  database = extract_placeholders(@database, metadata).gsub('.', '_')
  table = extract_placeholders(@table, metadata).gsub('.', '_')
  return database, table
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_mysql_bulk.rb, line 126
def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)
  [tag, time, record].to_msgpack
end
formatted_to_msgpack_binary() click to toggle source
# File lib/fluent/plugin/out_mysql_bulk.rb, line 131
def formatted_to_msgpack_binary
  true
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_mysql_bulk.rb, line 135
def multi_workers_ready?
  true
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_mysql_bulk.rb, line 162
def write(chunk)
  database, table = expand_placeholders(chunk.metadata)
  max_lengths = check_table_schema(database: database, table: table)
  @handler = client(database)
  values = []
  values_template = "(#{ @column_names.map { |key| '?' }.join(',') })"
  chunk.msgpack_each do |tag, time, data|
    data = format_proc.call(tag, time, data, max_lengths)
    values << Mysql2::Client.pseudo_bind(values_template, data)
  end
  sql = "INSERT INTO #{table} (#{@column_names.map{|x| "`#{x.to_s.gsub('`', '``')}`"}.join(',')}) VALUES #{values.join(',')}"
  sql += @on_duplicate_key_update_sql if @on_duplicate_key_update

  log.info "bulk insert values size (table: #{table}) => #{values.size}"
  @handler.query("SET SESSION TRANSACTION ISOLATION LEVEL #{transaction_isolation_level}") if @transaction_isolation_level
  @handler.xquery(sql)
  @handler.close
end

Private Instance Methods

format_proc() click to toggle source
# File lib/fluent/plugin/out_mysql_bulk.rb, line 183
def format_proc
  proc do |tag, time, record, max_lengths|
    values = []
    @key_names.each_with_index do |key, i|
      if key == '${time}'
        value = Time.at(time).strftime('%Y-%m-%d %H:%M:%S')
      else
        if max_lengths[i].nil? || record[key].nil?
          value = record[key]
        else
          value = record[key].to_s.slice(0, max_lengths[i])
        end

        if @json_key_names && @json_key_names.include?(key)
          value = Oj.dump(value)
        end

        if @unixtimestamp_key_names && @unixtimestamp_key_names.include?(key)
          value = Time.at(value).strftime('%Y-%m-%d %H:%M:%S')
        end
      end
      values << value
    end
    values
  end
end
transaction_isolation_level() click to toggle source
# File lib/fluent/plugin/out_mysql_bulk.rb, line 210
def transaction_isolation_level
  case @transaction_isolation_level
  when :read_uncommitted
    "READ UNCOMMITTED"
  when :read_committed
    "READ COMMITTED"
  when :repeatable_read
    "REPEATABLE READ"
  when :serializable
    "SERIALIZABLE"
  end
end