class Fluent::MysqlAppenderMultiInput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mysql_appender_multi.rb, line 10
def initialize
  require 'mysql2'
  require 'time'
  require 'yaml'
  require 'td'
  require 'td-client'
  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mysql_appender_multi.rb, line 29
def configure(conf)
  super
  @interval = Config.time_value(@interval)

  if @yaml_path.nil?
    raise Fluent::ConfigError, "mysql_appender_multi: missing 'yaml_path' parameter."
  end

  if !File.exist?(@yaml_path)
    raise Fluent::ConfigError, "mysql_appender_multi: No such file in 'yaml_path'."
  end

  if @tag.nil?
    raise Fluent::ConfigError, "mysql_appender_multi: missing 'tag' parameter. Please add following line into config like 'tag appender.${name}.${event}.${primary_key}'"
  end
end
format_tag(config) click to toggle source
# File lib/fluent/plugin/in_mysql_appender_multi.rb, line 148
def format_tag(config)
  add_db = ENV.key?('TD_DATABASE') ? ENV['TD_DATABASE'] + '.' : ''
  "#{tag}.#{add_db}#{config['table_name']}"
end
get_connection() click to toggle source
# File lib/fluent/plugin/in_mysql_appender_multi.rb, line 153
def get_connection
  begin
    return Mysql2::Client.new({
      :host => @host,
      :port => @port,
      :username => @username,
      :password => @password,
      :database => @database,
      :encoding => @encoding,
      :reconnect => true,
      :stream => true,
      :cache_rows => false
    })
  rescue Mysql2::Error => e
    $log.warn "mysql_appender_multi: #{e}"
    sleep @interval
    retry
  end
end
get_lastid(config) click to toggle source
# File lib/fluent/plugin/in_mysql_appender_multi.rb, line 106
def get_lastid(config)
  begin
    if !ENV.key?('TD_APIKEY') || !ENV.key?('TD_ENDPOINT') || !ENV.key?('TD_DATABASE') then
      return -1
    end
    cln = TreasureData::Client.new(ENV['TD_APIKEY'],{:endpoint => "https://" + ENV['TD_ENDPOINT']})
    table_exists = false
    cln.databases.each { |db|
      db.tables.each { |tbl|
        if tbl.db_name == ENV['TD_DATABASE'] && tbl.table_name == config['table_name'] then
            table_exists = true
            break
        end
      }
    }
    if table_exists then
      query = "SELECT MAX(#{config['primary_key']}) FROM #{config['table_name']}"
      job = cln.query(ENV['TD_DATABASE'], query, nil, nil, nil , {:type => :presto})
      until job.finished?
        sleep 2
        job.update_progress!
      end
      job.update_status!  # get latest info
      job.result_each { |row|
        $log.info  "mysql_appender_multi: #{ENV['TD_DATABASE']}.#{config['table_name']}'s last_id is #{row.first} "
        return row.first
      }
    else
      $log.info "mysql_appender_multi: #{ENV['TD_DATABASE']}.#{config['table_name']} is not found. "
      return -1
    end
  rescue => e
    $log.warn "mysql_appender_multi: failed to get lastid. #{config}"
    $log.error "error: #{e.message}"
    $log.error e.backtrace.join("\n")
  end
end
get_query(config, last_id) click to toggle source
# File lib/fluent/plugin/in_mysql_appender_multi.rb, line 144
def get_query(config, last_id)
  "SELECT #{config['columns'].join(",")} FROM #{config['table_name']} where #{config['primary_key']} > #{last_id} order by #{config['primary_key']} asc limit #{config['limit']}"
end
get_time(in_time) click to toggle source
# File lib/fluent/plugin/in_mysql_appender_multi.rb, line 173
def get_time(in_time)
  if in_time.kind_of?(Time) then
    in_time
  else
    Time.parse(in_time.to_s)
  end
end
poll(config) click to toggle source
# File lib/fluent/plugin/in_mysql_appender_multi.rb, line 68
def poll(config)
  begin
    tag = format_tag(config)
    @mutex.synchronize {
      $log.info "mysql_appender_multi: polling start. :tag=>#{tag}"
    }
    last_id = get_lastid(config)
    loop do
      rows_count = 0
      start_time = Time.now
      db = get_connection
      db.query(get_query(config, last_id)).each do |row|
        if rows_count == 0 || (last_id.to_i + 1) == row[config['primary_key']] then
          if config['time_column'].nil? then
              td_time = Engine.now
          else
            td_time = get_time(row[config['time_column']]).to_i
          end
          row.each {|k, v| row[k] = v.to_s if v.is_a?(Time) || v.is_a?(Date) || v.is_a?(BigDecimal)}
          router.emit(tag, td_time, row)
          rows_count += 1
          last_id = row[config['primary_key']]
        end
      end
      db.close
      elapsed_time = sprintf("%0.02f", Time.now - start_time)
      @mutex.synchronize {
        $log.info "mysql_appender_multi: finished execution :tag=>#{tag} :rows_count=>#{rows_count} :last_id=>#{last_id} :elapsed_time=>#{elapsed_time} sec"
      }
      sleep @interval
    end
  rescue => e
    $log.error "mysql_appender_multi: failed to execute query. :config=>#{masked_config}"
    $log.error "error: #{e.message}"
    $log.error e.backtrace.join("\n")
  end
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_mysql_appender_multi.rb, line 62
def shutdown
  @threads.each do |thread|
    Thread.kill(thread)
  end
end
start() click to toggle source
# File lib/fluent/plugin/in_mysql_appender_multi.rb, line 46
def start
  begin
    @threads = []
    @mutex = Mutex.new
    YAML.load_file(@yaml_path).each do |config|
      @threads << Thread.new {
        poll(config)
      }
    end
    $log.error "mysql_appender_multi: stop working due to empty configuration" if @threads.empty?
  rescue => e
    $log.error "error: #{e.message}"
    $log.error e.backtrace.join("\n")
  end
end